Skip to content

Commit

Permalink
Merge pull request #1017 from alexrudy/fix-history-multiprocessing
Browse files Browse the repository at this point in the history
Disable IPython History in executing preprocessor
  • Loading branch information
MSeal authored May 23, 2019
2 parents a863787 + 0c7d7ba commit 4d8efc0
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 5 deletions.
16 changes: 16 additions & 0 deletions nbconvert/preprocessors/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,20 @@ class ExecutePreprocessor(Preprocessor):
)
).tag(config=True)

ipython_hist_file = Unicode(
default_value=':memory:',
help="""Path to file to use for SQLite history database for an IPython kernel.
The specific value `:memory:` (including the colon
at both end but not the back ticks), avoids creating a history file. Otherwise, IPython
will create a history file for each kernel.
When running kernels simultaneously (e.g. via multiprocessing) saving history a single
SQLite file can result in database errors, so using `:memory:` is recommended in non-interactive
contexts.
""").tag(config=True)

kernel_manager_class = Type(
config=True,
help='The kernel manager class to use.'
Expand Down Expand Up @@ -272,6 +286,8 @@ def start_new_kernel(self, **kwargs):
'kernelspec', {}).get('name', 'python')
km = self.kernel_manager_class(kernel_name=self.kernel_name,
config=self.config)
if km.ipykernel and self.ipython_hist_file:
self.extra_arguments += ['--HistoryManager.hist_file={}'.format(self.ipython_hist_file)]
km.start_kernel(extra_arguments=self.extra_arguments, **kwargs)

kc = km.client()
Expand Down
28 changes: 28 additions & 0 deletions nbconvert/preprocessors/tests/files/Check History in Memory.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from IPython import get_ipython"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"ip = get_ipython()\n",
"assert ip.history_manager.hist_file == ':memory:'"
]
}
],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 2
}
65 changes: 65 additions & 0 deletions nbconvert/preprocessors/tests/files/Parallel Execute.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Ensure notebooks can execute in parallel\n",
"\n",
"This notebook uses a file system based \"lock\" to assert that two instances of the notebook kernel will run in parallel. Each instance writes to a file in a temporary directory, and then tries to read the other file from\n",
"the temporary directory, so that running them in sequence will fail, but running them in parallel will succed.\n",
"\n",
"Two notebooks are launched, each with an injected cell which sets the `this_notebook` variable. One notebook is set to `this_notebook = 'A'` and the other `this_notebook = 'B'`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import os.path\n",
"import tempfile\n",
"import time"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# the variable this_notebook is injectected in a cell above by the test framework.\n",
"other_notebook = {'A':'B', 'B':'A'}[this_notebook]\n",
"directory = os.environ['NBEXECUTE_TEST_PARALLEL_TMPDIR']\n",
"with open(os.path.join(directory, 'test_file_{}.txt'.format(this_notebook)), 'w') as f:\n",
" f.write('Hello from {}'.format(this_notebook))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"start = time.time()\n",
"timeout = 5\n",
"end = start + timeout\n",
"target_file = os.path.join(directory, 'test_file_{}.txt'.format(other_notebook))\n",
"while time.time() < end:\n",
" time.sleep(0.1)\n",
" if os.path.exists(target_file):\n",
" with open(target_file, 'r') as f:\n",
" text = f.read()\n",
" if text == 'Hello from {}'.format(other_notebook):\n",
" break\n",
"else:\n",
" assert False, \"Timed out – didn't get a message from {}\".format(other_notebook)"
]
}
],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 2
}
61 changes: 56 additions & 5 deletions nbconvert/preprocessors/tests/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import io
import os
import re
import threading
import multiprocessing as mp

import nbformat
import sys
Expand Down Expand Up @@ -73,14 +75,17 @@ def build_preprocessor(opts):
return preprocessor


def run_notebook(filename, opts, resources):
def run_notebook(filename, opts, resources, preprocess_notebook=None):
"""Loads and runs a notebook, returning both the version prior to
running it and the version after running it.
"""
with io.open(filename) as f:
input_nb = nbformat.read(f, 4)

if preprocess_notebook:
input_nb = preprocess_notebook(input_nb)

preprocessor = build_preprocessor(opts)
cleaned_input_nb = copy.deepcopy(input_nb)
for cell in cleaned_input_nb.cells:
Expand Down Expand Up @@ -223,6 +228,13 @@ def assert_notebooks_equal(expected, actual):
actual_execution_count = actual_cell.get('execution_count', None)
assert expected_execution_count == actual_execution_count

def notebook_resources():
"""Prepare a notebook resources dictionary for executing test notebooks in the `files` folder."""
res = ResourcesDict()
res['metadata'] = ResourcesDict()
res['metadata']['path'] = os.path.join(current_dir, 'files')
return res


@pytest.mark.parametrize(
["input_name", "opts"],
Expand All @@ -243,18 +255,57 @@ def assert_notebooks_equal(expected, actual):
("Unicode.ipynb", dict(kernel_name="python")),
("UnicodePy3.ipynb", dict(kernel_name="python")),
("update-display-id.ipynb", dict(kernel_name="python")),
("Check History in Memory.ipynb", dict(kernel_name="python")),
]
)
def test_run_all_notebooks(input_name, opts):
"""Runs a series of test notebooks and compares them to their actual output"""
input_file = os.path.join(current_dir, 'files', input_name)
res = ResourcesDict()
res['metadata'] = ResourcesDict()
res['metadata']['path'] = os.path.join(current_dir, 'files')
input_nb, output_nb = run_notebook(input_file, opts, res)
input_nb, output_nb = run_notebook(input_file, opts, notebook_resources())
assert_notebooks_equal(input_nb, output_nb)


def label_parallel_notebook(nb, label):
"""Insert a cell in a notebook which sets the variable `this_notebook` to the string `label`.
Used for parallel testing to label two notebooks which are run simultaneously.
"""
label_cell = nbformat.v4.new_code_cell(source="this_notebook = '{}'".format(label))
nb.cells.insert(1, label_cell)
return nb


def test_parallel_notebooks(capfd, tmpdir):
"""Two notebooks should be able to be run simultaneously without problems.
The two notebooks spawned here use the filesystem to check that the other notebook
wrote to the filesystem."""

opts = dict(kernel_name="python")
input_name = "Parallel Execute.ipynb"
input_file = os.path.join(current_dir, "files", input_name)
res = notebook_resources()

with modified_env({"NBEXECUTE_TEST_PARALLEL_TMPDIR": str(tmpdir)}):
threads = [
threading.Thread(
target=run_notebook,
args=(
input_file,
opts,
res,
functools.partial(label_parallel_notebook, label=label),
),
)
for label in ("A", "B")
]
[t.start() for t in threads]
[t.join(timeout=2) for t in threads]

captured = capfd.readouterr()
assert captured.err == ""


class TestExecute(PreprocessorTestsBase):
"""Contains test functions for execute.py"""
maxDiff = None
Expand Down

0 comments on commit 4d8efc0

Please sign in to comment.