Skip to content

Commit

Permalink
async compatible with Python3.5
Browse files Browse the repository at this point in the history
  • Loading branch information
maartenbreddels committed Sep 9, 2019
1 parent 71479c9 commit 45d0368
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 58 deletions.
7 changes: 4 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ os:
- osx
env:
matrix:
- PYTHON_VERSION=3.5
- PYTHON_VERSION=3.6
- PYTHON_VERSION=3.7
before_install:
Expand All @@ -17,12 +18,12 @@ before_install:
- conda config --set always_yes yes --set changeps1 no
- conda update -q conda
- conda info -a
- conda create -q -n test-environment -c conda-forge python=$PYTHON_VERSION jupyter_server==0.1.0 jupyterlab_pygments==0.1.0 pytest==3.10.1 nbconvert=5.5 pytest-cov nodejs flake8 ipywidgets matplotlib xeus-cling
- conda create -q -n test-environment -c conda-forge python=$PYTHON_VERSION nodejs flake8 xeus-cling
- source activate test-environment
install:
- pip install ".[test]"
- pip uninstall jupyter_client
- pip install git+https://github.com/maartenbreddels/jupyter_client/tree/feat_async
- pip uninstall jupyter_client -y
- pip install git+https://github.com/maartenbreddels/jupyter_client@feat_async
- cd tests/test_template; pip install .; cd ../../;
after_script:
- flake8 voila tests setup.py
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ def get_data_files():
'jupyter_server>=0.1.0,<0.2.0',
'nbconvert>=5.5.0,<6',
'jupyterlab_pygments>=0.1.0,<0.2',
'async_generator',
'pygments>=2.4.1,<3' # Explicitly requiring pygments which is a second-order dependency.
# An older versions is generally installed already and is otherwise not updated by pip.
],
Expand Down
2 changes: 2 additions & 0 deletions tests/app/execute_asyncio_test.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# test basics of voila running a notebook
import pytest
import sys


@pytest.fixture
def voila_args_extra():
return ['--VoilaExporter.cell_executor_class=voila.execute_asyncio.CellExecutorAsyncio']


@pytest.mark.skipif(sys.version_info[:2] < (3, 6), reason='Jinja with Python35 has no async generator support')
@pytest.mark.gen_test
def test_hello_world(app, http_client, base_url):
response = yield http_client.fetch(base_url)
Expand Down
2 changes: 2 additions & 0 deletions tests/app/execute_threaded_test.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# test basics of voila running a notebook
import pytest
import sys


@pytest.fixture
def voila_args_extra():
return ['--VoilaExporter.cell_executor_class=voila.execute_threaded.CellExecutorThreaded']


@pytest.mark.skipif(sys.version_info[:2] < (3, 6), reason='Jinja with Python35 has no async generator support')
@pytest.mark.gen_test
def test_hello_world(app, http_client, base_url):
response = yield http_client.fetch(base_url)
Expand Down
16 changes: 11 additions & 5 deletions voila/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,20 @@ def notebook_execute(self, nb, kernel_id):


class CellExecutorNbConvert(CellExecutor):
async def kernel_start(self):
"""Synchronous version of a cell executor, used for Python3.5"""
def kernel_start(self):
assert not self.kernel_started, "kernel was already started"
# Launch kernel
kernel_id = await tornado.gen.maybe_future(self.multi_kernel_manager.start_kernel(kernel_name=self.notebook.metadata.kernelspec.name, path=self.cwd))

@tornado.gen.coroutine
def wrapper():
# kernel_id = await tornado.gen.maybe_future(self.multi_kernel_manager.start_kernel(kernel_name=self.notebook.metadata.kernelspec.name, path=self.cwd))
kernel_id = yield tornado.gen.maybe_future(self.multi_kernel_manager.start_kernel(kernel_name=self.notebook.metadata.kernelspec.name, path=self.cwd))
return kernel_id
kernel_id = wrapper().result()
self.kernel_started = True
return kernel_id

async def cell_generator(self, nb, kernel_id):
def cell_generator(self, nb, kernel_id):
"""Generator that will execute a single notebook cell at a time"""
assert self.kernel_started
km = self.multi_kernel_manager.get_kernel(kernel_id)
Expand All @@ -207,7 +213,7 @@ async def cell_generator(self, nb, kernel_id):

def notebook_execute(self, nb, kernel_id):
assert self.kernel_started
km = self.kernel_manager.get_kernel(kernel_id)
km = self.multi_kernel_manager.get_kernel(kernel_id)
result = executenb(nb, km=km, cwd=self.cwd, parent=self.multi_kernel_manager)
# result.cells = list(filter(lambda cell: filter_empty_code_cells(cell, self.exporter), result.cells))
# we modify the notebook in place, since the nb variable cannot be reassigned it seems in jinja2
Expand Down
16 changes: 11 additions & 5 deletions voila/execute_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

from .execute import CellExecutor

# As long as we support Python35, we use this library to get as async
# generators: https://pypi.org/project/async_generator/
from async_generator import async_generator, yield_


class CellExecutorAsyncio(CellExecutor):
def __init__(self, notebook, cwd, multi_kernel_manager=None):
Expand Down Expand Up @@ -112,16 +116,16 @@ async def _handle_io(self):

def process_io(self, msg):
msg_type = msg['msg_type']
msg_id = msg['parent_header'].get('msg_id', 'dummy')
content = msg['content']
# msg_id = msg['parent_header'].get('msg_id', 'dummy')
# content = msg['content']
if msg_type == 'status':
pass
# if content['execution_state'] == 'idle':
# self.log.debug('idle for: %s', msg_id)
# self.idle_future[msg_id].set_result(msg_id)
# self.idle_future[msg_id].set_result(msg_id)
elif msg_type == 'clear_output':
self.clear_output(cell.outputs, msg, cell_index)
pass # self.clear_output(cell.outputs, msg, cell_index)
elif msg_type.startswith('comm'):
self.handle_comm_msg(msg)
# Check for remaining messages we don't process
Expand All @@ -140,14 +144,16 @@ def output(self, msg):
def handle_comm_msg(self, msg):
pass

@async_generator
async def execute(self):
for i, cell in enumerate(self.cells):
cell = await self.execute_cell(i)
yield cell
await yield_(cell)

@async_generator
async def cell_generator(self, nb, kernel_id):
async for cell in self.execute():
yield cell
await yield_(cell)

async def execute_cell(self, index):
self.cell_index = index
Expand Down
47 changes: 6 additions & 41 deletions voila/execute_threaded.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,20 @@
import asyncio
import concurrent.futures
import inspect

import tornado.gen
from jupyter_client import KernelManager

from .execute import CellExecutor, executenb
from .threading import ThreadExecutor


class ThreadExecutor(concurrent.futures.ThreadPoolExecutor):
"""Executor that executes on a single thread and uses coroutines
We assume that if two jobs are submitted, they are executed in the same
order. This is important for executing cells in order. The implementatioån
of ThreadPoolExecutor works this way by using a Queue.
"""
def __init__(self):
super(ThreadExecutor, self).__init__(max_workers=1)

def submit(self, fn, *args, **kwargs):
"""This submit allows fn to also be a coroutine, which will be awaited for"""
def coroutine_wrapper(*args, **kwargs):
try:
ioloop_in_thread = asyncio.get_event_loop()
except RuntimeError:
ioloop_in_thread = None
if ioloop_in_thread is None:
ioloop_in_thread = asyncio.new_event_loop()
asyncio.set_event_loop(ioloop_in_thread)
return ioloop_in_thread.run_until_complete(fn(*args, **kwargs))
if inspect.iscoroutinefunction(fn):
return super(ThreadExecutor, self).submit(coroutine_wrapper, *args, **kwargs)
else:
return super(ThreadExecutor, self).submit(fn, *args, **kwargs)

async def submit_async(self, fn, *args, **kwargs):
"""A coroutine version of submit, since an asyncio Future is not a concurrent.future.Future
Allow for the following:
>>> import asyncio
>>> await executor.submit_async(asyncio.sleep, 1)
"""
ioloop = asyncio.get_event_loop()
return await ioloop.run_in_executor(self, fn, *args, **kwargs)
# As long as we support Python35, we use this library to get as async
# generators: https://pypi.org/project/async_generator/
from async_generator import async_generator, yield_


class CellExecutorThreaded(CellExecutor):
"""Executes the notebook cells in a thread.
For the network/zmq layer it is important that all calls are done from the
same thread, which is the reason we use a single thread.
"""
def __init__(self, notebook, cwd, multi_kernel_manager):
self.notebook = notebook
Expand Down Expand Up @@ -83,6 +47,7 @@ async def start_kernel_in_thread():
self.kernel_id = await tornado.gen.maybe_future(self.multi_kernel_manager.start_kernel(kernel_name=self.notebook.metadata.kernelspec.name, path=self.cwd))
return await self.executor.submit_async(start_kernel_in_thread)

@async_generator
async def cell_generator(self, nb, kernel_id):
km = self.multi_kernel_manager.get_kernel(kernel_id)

Expand All @@ -101,4 +66,4 @@ def cell_execute(i):
cell_futures = [ioloop.run_in_executor(self.executor, cell_execute, i) for i in range(N)]
for cell_future in cell_futures:
cell = await cell_future
yield cell
await yield_(cell)
34 changes: 30 additions & 4 deletions voila/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,26 @@
# The full license is in the file LICENSE, distributed with this software. #
#############################################################################

import inspect
import mimetypes

import traitlets
from traitlets.config import Config

from jinja2 import contextfilter
from jinja2.utils import have_async_gen

from nbconvert.filters.markdown_mistune import IPythonRenderer, MarkdownWithMath
from nbconvert.exporters.html import HTMLExporter
from nbconvert.exporters.templateexporter import TemplateExporter
from nbconvert.filters.highlight import Highlight2HTML

from .threading import async_generator_to_thread

# As long as we support Python35, we use this library to get as async
# generators: https://pypi.org/project/async_generator/
from async_generator import async_generator, yield_


class VoilaMarkdownRenderer(IPythonRenderer):
"""Custom markdown renderer that inlines images"""
Expand Down Expand Up @@ -79,13 +87,14 @@ def _default_preprocessors(self):
def default_template_file(self):
return 'voila.tpl'

@async_generator
async def generate_from_notebook_node(self, nb, cwd, multi_kernel_manager, resources=None, **kw):
cell_executor = self.cell_executor_class(nb, cwd, multi_kernel_manager)
# cell_executor = ThreadedNotebookCellExecutor(self.kernel_manager, self.traitlet_config, exporter)
# cell_executor = AsyncioCellExecutor(self.kernel_manager, self.traitlet_config, exporter)
# this replaces from_notebook_node, but calls template.generate_async instead of template.render
extra_context = {
'kernel_start': cell_executor.kernel_start, # pass the result (not the future) to the template
'kernel_start': cell_executor.kernel_start,
'cell_generator': cell_executor.cell_generator,
'notebook_execute': cell_executor.notebook_execute,
}
Expand All @@ -110,13 +119,30 @@ async def generate_from_notebook_node(self, nb, cwd, multi_kernel_manager, resou
}

# Top level variables are passed to the template_exporter here.
async for output in self.template.generate_async(nb=nb_copy, resources=resources, **extra_context):
yield output, resources
if self.template.environment.is_async:
async for output in self.template.generate_async(nb=nb_copy, resources=resources, **extra_context):
await yield_((output, resources))
else:
# Jinja with Python3.5 does not support async (generators), which
# means that it's not async all the way down. Which means that we
# cannot use coroutines for the cell_generator, and that they will
# block the IO loop. In that case we will run the iterator in a
# thread instead.
assert inspect.iscoroutinefunction(cell_executor.kernel_start) is False, 'The cell executor\'s kernel_start should not be a coroutine for Python3.5'
assert inspect.iscoroutinefunction(cell_executor.cell_generator) is False, 'The cell executor\'s cell_generator should not not be a coroutine for Python3.5'

@async_generator
async def async_jinja_generator():
for output in self.template.generate(nb=nb_copy, resources=resources, **extra_context):
await yield_((output, resources))
threaded_async_jinja_generator = async_generator_to_thread(async_jinja_generator)
async for output, resources in threaded_async_jinja_generator():
await yield_((output, resources))

@property
def environment(self):
env = super(type(self), self).environment
if 'jinja2.ext.do' not in env.extensions:
env.add_extension('jinja2.ext.do')
env.is_async = True
env.is_async = have_async_gen
return env
90 changes: 90 additions & 0 deletions voila/threading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import asyncio
import concurrent.futures
import inspect
import threading
import tornado.queues

# As long as we support Python35, we use this library to get as async
# generators: https://pypi.org/project/async_generator/
from async_generator import async_generator, yield_


class ThreadExecutor(concurrent.futures.ThreadPoolExecutor):
"""Executor that executes on a single thread and uses coroutines
We assume that if two jobs are submitted, they are executed in the same
order. This is important for executing cells in order. The implementation
of ThreadPoolExecutor works this way by using a Queue.
"""
def __init__(self):
super(ThreadExecutor, self).__init__(max_workers=1)

def submit(self, fn, *args, **kwargs):
"""This submit allows fn to also be a coroutine, which will be awaited for"""
def coroutine_wrapper(*args, **kwargs):
try:
ioloop_in_thread = asyncio.get_event_loop()
except RuntimeError:
ioloop_in_thread = None
if ioloop_in_thread is None:
ioloop_in_thread = asyncio.new_event_loop()
asyncio.set_event_loop(ioloop_in_thread)
return ioloop_in_thread.run_until_complete(fn(*args, **kwargs))
if inspect.iscoroutinefunction(fn):
return super(ThreadExecutor, self).submit(coroutine_wrapper, *args, **kwargs)
else:
return super(ThreadExecutor, self).submit(fn, *args, **kwargs)

async def submit_async(self, fn, *args, **kwargs):
"""A coroutine version of submit, since an asyncio Future is not a concurrent.future.Future
Allow for the following:
>>> import asyncio
>>> await executor.submit_async(asyncio.sleep, 1)
"""
ioloop = asyncio.get_event_loop()
return await ioloop.run_in_executor(self, fn, *args, **kwargs)


class ThreadedAsyncGenerator(threading.Thread):
def __init__(self, main_ioloop, fn, *args, **kwargs):
super(ThreadedAsyncGenerator, self).__init__()
self.main_ioloop = main_ioloop
self.fn = fn
self.args = args
self.kwargs = kwargs
self.queue = tornado.queues.Queue()
self.start()

def run(self):
ioloop_in_thread = asyncio.new_event_loop()
asyncio.set_event_loop(ioloop_in_thread)
return ioloop_in_thread.run_until_complete(self._run())

async def _run(self):
async for item in self.fn(*self.args, **self.kwargs):
def thread_safe_put(item=item):
self.queue.put(item)
self.main_ioloop.call_soon_threadsafe(thread_safe_put)

def thread_safe_end():
self.queue.put(StopIteration)
self.main_ioloop.call_soon_threadsafe(thread_safe_end)

@async_generator
async def __aiter__(self):
while True:
value = await self.queue.get()
if value == StopIteration:
break
await yield_(value)


def async_generator_to_thread(fn):
"""Calls an async generator function fn in a thread and async returns the results"""
ioloop = asyncio.get_event_loop()

def wrapper(*args, **kwargs):
gen = ThreadedAsyncGenerator(ioloop, fn, *args, **kwargs)
return gen
return wrapper

0 comments on commit 45d0368

Please sign in to comment.