Skip to content

Commit

Permalink
Refactored process_message to raise CellExecutionComplete on completion
Browse files Browse the repository at this point in the history
  • Loading branch information
MSeal committed Apr 15, 2019
1 parent 83a4e72 commit f2a99cc
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 29 deletions.
77 changes: 49 additions & 28 deletions nbconvert/preprocessors/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from ..utils.exceptions import ConversionException


class CellExecutionComplete(Exception): pass # Used as a control signal

class CellExecutionError(ConversionException):
"""
Custom exception to propagate exceptions that are raised during
Expand Down Expand Up @@ -395,14 +397,13 @@ def preprocess_cell(self, cell, resources, cell_index):
if cell.cell_type != 'code' or not cell.source.strip():
return cell, resources

reply, outputs = self.run_cell(cell, cell_index)
cell.outputs = outputs
reply = self.run_cell(cell, cell_index)

cell_allows_errors = (self.allow_errors or "raises-exception"
in cell.metadata.get("tags", []))

if self.force_raise_errors or not cell_allows_errors:
for out in outputs:
for out in cell.outputs:
if out.output_type == 'error':
raise CellExecutionError.from_cell_and_msg(cell, out)
if (reply is not None) and reply['content']['status'] == 'error':
Expand Down Expand Up @@ -507,23 +508,46 @@ def run_cell(self, cell, cell_index=0):
# not an output from our execution
continue

if not self.process_message(msg, cell, cell_index):
# Will raise CellExecutionComplete when completed
try:
self.process_message(msg, cell, cell_index)
except CellExecutionComplete:
break

return exec_reply, cell.outputs
return exec_reply

def process_message(self, msg, cell, cell_index):
'''
Returns None if execution should be halted.
'''
"""
Processes a kernel message, updates cell state, and returns the
resulting output object that was appended to cell.outputs.
The input argument `cell` is modified in-place.
Parameters
----------
msg : dict
The kernel message being processed.
cell : nbformat.NotebookNode
The cell which is currently being processed.
cell_index : int
The position of the cell within the notebook object.
Returns
-------
output : dict
The execution output payload (or None for no output).
Raises
------
CellExecutionComplete
Once a message arrives which indicates computation completeness.
"""
msg_type = msg['msg_type']
self.log.debug("msg_type: %s", msg_type)
content = msg['content']
self.log.debug("content: %s", content)

# Default to our input as the "result" of processing the message
result = msg

display_id = content.get('transient', {}).get('display_id', None)
if display_id and msg_type in {'execute_result', 'display_data', 'update_display_data'}:
self._update_display_id(display_id, msg)
Expand All @@ -534,45 +558,42 @@ def process_message(self, msg, cell, cell_index):

if msg_type == 'status':
if content['execution_state'] == 'idle':
# Set result to None to halt execution
result = None
raise CellExecutionComplete()
elif msg_type == 'clear_output':
self.clear_output(cell.outputs, msg, cell_index)
elif msg_type.startswith('comm'):
self.handle_comm_msg(cell.outputs, msg, cell_index)
# Check for remaining messages we don't process
elif not (msg_type in ['execute_input', 'update_display_data'] or msg_type.startswith('comm')):
try:
# Assign output as our processed "result"
result = output_from_msg(msg)
except ValueError:
self.log.error("unhandled iopub msg: " + msg_type)
else:
self.output(cell.outputs, msg, display_id, cell_index)

return result
elif msg_type not in ['execute_input', 'update_display_data']:
# Assign output as our processed "result"
return self.output(cell.outputs, msg, display_id, cell_index)

def output(self, outs, msg, display_id, cell_index):
msg_type = msg['msg_type']
if self.clear_before_next_output:
self.log.debug('Executing delayed clear_output')
outs[:] = []
self.clear_display_id_mapping(cell_index)
self.clear_before_next_output = False

try:
out = output_from_msg(msg)
except ValueError:
self.log.error("unhandled iopub msg: " + msg_type)
return
else:
if self.clear_before_next_output:
self.log.debug('Executing delayed clear_output')
outs[:] = []
self.clear_display_id_mapping(cell_index)
self.clear_before_next_output = False

if display_id:
# record output index in:
# _display_id_map[display_id][cell_idx]
cell_map = self._display_id_map.setdefault(display_id, {})
output_idx_list = cell_map.setdefault(cell_index, [])
output_idx_list.append(len(outs))

outs.append(out)

return out

def clear_output(self, outs, msg, cell_index):
content = msg['content']
if content.get('wait'):
Expand Down
2 changes: 1 addition & 1 deletion nbconvert/preprocessors/tests/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def test_process_message_wrapper(self):
class WrappedPreProc(ExecutePreprocessor):
def process_message(self, msg, cell, cell_index):
result = super(WrappedPreProc, self).process_message(msg, cell, cell_index)
if result and result != msg:
if result:
outputs.append(result)
return result

Expand Down

0 comments on commit f2a99cc

Please sign in to comment.