Skip to content

Commit

Permalink
[Viewer] Fix kernel exception and stack corruption in notebooks (#190)
Browse files Browse the repository at this point in the history
* Only process kernel comm messages on the spot. 
* Do not examine ipython port while looking for zmq meshcat.

Co-authored-by: Alexis Duburcq <alexis.duburcq@wandercraft.eu>
  • Loading branch information
duburcqa and Alexis Duburcq authored Sep 1, 2020
1 parent 7e56f01 commit d6a5c86
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 15 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
cmake_minimum_required(VERSION 3.10)

# Set the build version
set(BUILD_VERSION 1.3.5)
set(BUILD_VERSION 1.3.6)

# Add definition of Jiminy version for C++ headers
add_definitions("-DJIMINY_VERSION=\"${BUILD_VERSION}\"")
Expand Down
2 changes: 1 addition & 1 deletion python/jiminy_py/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def finalize_options(self):
packages = find_packages('src'),
package_dir = {'': 'src'},
package_data = {'jiminy_py': ['**/*.dll', '**/*.so', '**/*.pyd', '**/*.html', '**/*.js']},
entry_points={'console_scripts': [
entry_points = {'console_scripts': [
'jiminy_plot=jiminy_py.log:plot_log',
'jiminy_meshcat_server=jiminy_py.meshcat.server:start_meshcat_server_standalone'
]},
Expand Down
101 changes: 89 additions & 12 deletions python/jiminy_py/src/jiminy_py/meshcat/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,97 @@ def is_notebook():
return 0 # Unidentified type


# Monkey-patch meshcat ViewerWindow 'send' method to process queued comm
# messages. Otherwise, new opening comm will not be detected soon enough.
if is_notebook():
from IPython import get_ipython
import tornado.gen
from ipykernel.kernelbase import SHELL_PRIORITY

class CommProcessor:
"""
@brief Re-implementation of ipykernel.kernelbase.do_one_iteration
to only handle comm messages on the spot, and put back in
the stack the other ones.
@details Calling 'do_one_iteration' messes up with kernel
'msg_queue'. Some messages will be processed too soon,
which is likely to corrupt the kernel state. This method
only processes comm messages to avoid such side effects.
"""

def __init__(self):
self.__kernel = get_ipython().kernel
self.qsize_old = 0

def __call__(self, unsafe=False):
"""
@brief Check once if there is pending comm related event in
the shell stream message priority queue.
@param[in] unsafe Whether or not to assume check if the number
of pending message has changed is enough. It
makes the evaluation much faster but flawed.
"""
# Flush every IN messages on shell_stream only
# Note that it is a faster implementation of ZMQStream.flush
# to only handle incoming messages. It reduces the computation
# from about 15us to 15ns.
# https://github.com/zeromq/pyzmq/blob/e424f83ceb0856204c96b1abac93a1cfe205df4a/zmq/eventloop/zmqstream.py#L313
shell_stream = self.__kernel.shell_streams[0]
shell_stream.poller.register(shell_stream.socket, zmq.POLLIN)
events = shell_stream.poller.poll(0)
while events:
_, event = events[0]
if event:
shell_stream._handle_recv()
shell_stream.poller.register(
shell_stream.socket, zmq.POLLIN)
events = shell_stream.poller.poll(0)

qsize = self.__kernel.msg_queue.qsize()
if unsafe and qsize == self.qsize_old:
# The number of queued messages in the queue has not changed
# since it last time it has been checked. Assuming those
# messages are the same has before and returning earlier.
return

# One must go through all the messages to keep them in order
for _ in range(qsize):
priority, t, dispatch, args = \
self.__kernel.msg_queue.get_nowait()
if priority <= SHELL_PRIORITY:
_, msg = self.__kernel.session.feed_identities(
args[1], copy=False)
msg = self.__kernel.session.deserialize(
msg, content=False, copy=False)
else:
# Do not spend time analyzing already rejected message
msg = None
if msg is None or not 'comm_' in msg['header']['msg_type']:
# The message is not related to comm, so putting it back in
# the queue after lowering its priority so that it is send
# at the "end of the queue", ie just at the right place:
# after the next unchecked messages, after the other
# messages already put back in the queue, but before the
# next one to go the same way. Note that every shell
# messages have SHELL_PRIORITY by default.
self.__kernel.msg_queue.put_nowait(
(SHELL_PRIORITY + 1, t, dispatch, args))
else:
# Comm message. Processing it right now.
tornado.gen.maybe_future(dispatch(*args))
self.qsize_old = self.__kernel.msg_queue.qsize()

process_kernel_comm = CommProcessor()

# Monkey-patch meshcat ViewerWindow 'send' method to process queued comm
# messages. Otherwise, new opening comm will not be detected soon enough.
_send_orig = meshcat.visualizer.ViewerWindow.send
def _send(self, command):
_send_orig(self, command)
get_ipython().kernel.do_one_iteration()
# Check on new comm related messages. Unsafe in enabled to avoid
# potentially significant overhead. At this point several safe should
# have been executed, so it is much less likely than comm messages
# will slip through the net.
process_kernel_comm(unsafe=True)
meshcat.visualizer.ViewerWindow.send = _send


Expand Down Expand Up @@ -92,11 +175,6 @@ def __comm_register(self, comm, msg):
# is to interleave blocking code with call of 'kernel.do_one_iteration'
# or 'await kernel.process_one(wait=True)'. See Stackoverflow for ref.
# https://stackoverflow.com/questions/63651823/direct-communication-between-javascript-in-jupyter-and-server-via-ipython-kernel/63666477#63666477
# TODO Calling 'do_one_iteration' messes up with the kernel 'msg_queue',
# so that some messages will be processed too soon. It would be better
# to deal with the stack manually, 'get' each messages, and check if it
# must be handle now are later, then put it back in the stack if so.
# https://github.com/ipython/ipykernel/blob/e048a93d93e11b19e25fb13c4eb7b4cb44ea081c/ipykernel/kernelbase.py#L348
@comm.on_msg
def _on_msg(msg):
self.n_message += 1
Expand Down Expand Up @@ -149,7 +227,6 @@ def __init__(self, zmq_url=None, comm_url=None):
# implementation of Meshcat.
self.comm_manager = None
if must_launch_server and is_notebook():
self.__kernel = get_ipython().kernel
self.comm_manager = CommManager(comm_url)

# Make sure the server is properly closed
Expand Down Expand Up @@ -186,13 +263,13 @@ def wait(self, require_client=False):
# point. Fetching new incoming messages and retrying.
# By doing this, opening a websocket or comm should
# be enough to successfully recv the acknowledgement.
self.__kernel.do_one_iteration()
process_kernel_comm()

self.__zmq_socket.send(b"ready")
if self.comm_manager is not None:
self.comm_manager.n_message = 0
while self.comm_manager.n_message < self.comm_manager.n_comm:
self.__kernel.do_one_iteration()
process_kernel_comm()
return self.__zmq_socket.recv().decode("utf-8")

def start_recording(self, fps, width, height):
Expand Down
16 changes: 15 additions & 1 deletion python/jiminy_py/src/jiminy_py/viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,17 @@ def _gepetto_client_connect(get_proc_info=False):
if 'python' in cmdline[0] or 'meshcat' in cmdline[-1]:
meshcat_candidate_conn.append(conn)

# Exclude ipython kernel ports from the look up because sending a
# message on ipython ports will throw a low-level exception, that
# is not blocking on Jupyter, but is on Google Colab.
excluded_ports = []
if is_notebook():
try:
excluded_ports += list(
get_ipython().kernel._recorded_ports.values())
except (NameError, AttributeError):
pass # No Ipython kernel running

# Use the first port responding to zmq request, if any
zmq_url = None
context = zmq.Context.instance()
Expand All @@ -708,7 +719,10 @@ def _gepetto_client_connect(get_proc_info=False):
# Note that the timeout must be long enough to give enough
# time to the server to respond, but not to long to avoid
# sending to much time spanning the available connections.
zmq_url = f"tcp://127.0.0.1:{conn.laddr.port}"
port = conn.laddr.port
if port in excluded_ports:
continue
zmq_url = f"tcp://127.0.0.1:{port}"
zmq_socket = context.socket(zmq.REQ)
zmq_socket.RCVTIMEO = 250 # millisecond
zmq_socket.connect(zmq_url)
Expand Down

0 comments on commit d6a5c86

Please sign in to comment.