Skip to content

Commit

Permalink
[python/viewer] Generic direct connection through ipykernel.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexis Duburcq committed Jan 20, 2022
1 parent 57dbdd1 commit 9df3afc
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 37 deletions.
121 changes: 112 additions & 9 deletions python/jiminy_py/src/jiminy_py/viewer/meshcat/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@
}
};

// Connect the viewer to the existing server, using the
// usual websocket on desktop, though kernel communication
// in Google Colaboratory or Jupyter notebooks.
// Connect the viewer to the existing server, though direct ZMP
// websocket in standalone browser or though kernel communication
// in notebooks.
try {
// Set externally by python in interactive mode
var ws_path = undefined;

if (typeof google !== 'undefined') {
(async () => {
viewer.connection = await google.colab.kernel.comms.open("meshcat", "meshcat:open");
Expand All @@ -58,16 +61,116 @@
viewer.handle_command_bytearray(new Uint8Array(message.buffers[0].buffer));
});
viewer.connection.on_close(function(message) {
viewer.connection = null; // The connection is no longer available
console.log("connection to Jupyter kernel closed:", message);
viewer.connection = null; // The connection is no longer available
});
}
else {
var ws_url = undefined;
viewer.connect(ws_url);
else if (ws_path !== undefined) {
// Connect to kernel socket manually if necessary, namely for
// VSCode notebooks and jupyterlab.
const ws_url = "ws" + window.parent.location.origin.substring(4) + ws_path;
viewer.connection = new window.WebSocket(ws_url);

// Define UUID generation utility to identify the comm and messages
function uuid() {
return ([1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g, c =>
(c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16)
);
};

// Define message deserialization method
var deserialize_array_buffer = function (buf) {
var data = new DataView(buf);
var nbufs = data.getUint32(0);
var offsets = [];
var i;
for (i = 1; i <= nbufs; i++) {
offsets.push(data.getUint32(i * 4));
}
var json_bytes = new Uint8Array(buf.slice(offsets[0], offsets[1]));
var msg = JSON.parse(
(new TextDecoder('utf8')).decode(json_bytes)
);
msg.buffers = [];
var start, stop;
for (i = 1; i < nbufs; i++) {
start = offsets[i];
stop = offsets[i+1] || buf.byteLength;
msg.buffers.push(new DataView(buf.slice(start, stop)));
}
return msg;
};

// Create unique comm identifier
const comm_id = uuid();

// Monkey-patch send command
var send = viewer.connection.send;
viewer.connection.send = function(data, msg_type) {
var msg = {
header : {
date: new Date().toISOString(),
username : "meshcat",
msg_id : uuid(),
session : "000000000000",
version : "5.3",
msg_type : msg_type || "comm_msg"
},
metadata : {},
content : {
comm_id : comm_id,
target_name : "meshcat",
data : data,
},
channel : 'shell',
buffers : [],
parent_header : {}
};
send.call(this, JSON.stringify(msg));
};

// Monkey-patch close command
var close = viewer.connection.close;
viewer.connection.close = function() {
// For some reason, `onclose` is never called, and
// calling the original `close` interferes with the
// send method and the message is never received.
viewer.connection.send("meshcat:close", "comm_close");
};

// Define event handler
viewer.connection.onopen = function(event) {
console.log("connection to generic ipykernel:", viewer.connection);
viewer.connection.send("meshcat:open", "comm_open");
};
viewer.connection.onmessage = async function (event) {
var data = event.data;
if (data instanceof Blob) {
const reader = new FileReader();
reader.addEventListener('loadend', () => {
var p = Promise.resolve(deserialize_array_buffer(reader.result));
p.then(function(msg) {
if (msg.content.comm_id === comm_id)
{
viewer.handle_command_bytearray(new Uint8Array(msg.buffers[0].buffer));
}
});
});
reader.readAsArrayBuffer(event.data);
}
};
viewer.connection.onclose = function (message) {
console.log("connection to generic ipykernel closed:", message);
viewer.connection = null; // The connection is no longer available
};
}
else
{
// Fallback to direct local communication through meshcat ZMQ socket
viewer.connect();
}
} catch (e) {
console.info("Not connected to MeshCat server: ", e);
console.info("not connected to MeshCat server: ", e);
}

// Replace the mesh grid by a filled checkerboard, similar to
Expand Down Expand Up @@ -143,7 +246,7 @@
// available.
if (viewer.connection !== null)
{
console.log("Closing connection...");
console.log("closing connection...");
viewer.connection.close();
}
return false;
Expand Down
1 change: 1 addition & 0 deletions python/jiminy_py/src/jiminy_py/viewer/meshcat/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def handle_comm(self, frames: Sequence[bytes]) -> None:
self.send_scene(comm_id=comm_id)
self.comm_pool.add(comm_id)
if self.is_waiting_ready_msg:
# Send request for acknowledgment a-posteriori
msg = umsgpack.packb({"type": "ready"})
self.forward_to_comm(comm_id, msg)
elif cmd.startswith("close:"):
Expand Down
38 changes: 23 additions & 15 deletions python/jiminy_py/src/jiminy_py/viewer/meshcat/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import pathlib
import umsgpack
import threading
import tornado.gen
import tornado.ioloop
from contextlib import redirect_stdout, redirect_stderr
from typing import Optional, Sequence, Dict, Any
Expand Down Expand Up @@ -113,7 +112,7 @@ def __call__(self, unsafe: bool = False) -> None:
# New message: reading message without deserializing its
# content at this point for efficiency.
_, msg = self.__kernel.session.feed_identities(
args[1], copy=False)
args[-1], copy=False)
msg = self.__kernel.session.deserialize(
msg, content=False, copy=False)
else:
Expand All @@ -122,20 +121,29 @@ def __call__(self, unsafe: bool = False) -> None:

if msg is not None and \
msg['header']['msg_type'].startswith('comm_'):
# Extract comm type and handler
comm_type = msg['header']['msg_type']
comm_handler = getattr(
self.__kernel.comm_manager, comm_type)

# Extract message content
content = self.__kernel.session.unpack(msg['content'])
data = content.get('data', '')

# Comm message. Analyzing message content to determine if
# it is related to meshcat or not.
if msg['header']['msg_type'] == 'comm_close':
if comm_type == 'comm_close':
# All comm_close messages are processed because Google
# Colab API does not support sending data on close.
data = "meshcat:close"
else:
content = self.__kernel.session.unpack(msg['content'])
data = content.get('data', '')
msg['content'] = content
comm_handler(None, None, msg)
continue
if isinstance(data, str) and data.startswith('meshcat:'):
# Comm message related to meshcat. Processing it right
# now and moving to the next message without puting it
# back into the queue.
tornado.gen.maybe_future(dispatch(*args))
msg['content'] = content
comm_handler(None, None, msg)
continue

# The message is not related to meshcat comm, so putting it
Expand Down Expand Up @@ -192,7 +200,7 @@ def forward_comm_thread():
self.__comm_socket = context.socket(zmq.XREQ)
self.__comm_socket.connect(comm_url)
self.__comm_stream = ZMQStream(self.__comm_socket, self.__ioloop)
self.__comm_stream.on_recv(self.__forward_to_ipython)
self.__comm_stream.on_recv(self.__forward_to_ipykernel)
self.__ioloop.start()
self.__ioloop.close()
self.__ioloop = None
Expand All @@ -213,25 +221,25 @@ def __del__(self) -> None:
def close(self) -> None:
self.n_comm = 0
self.n_message = 0
self.__kernel.comm_manager.unregister_target(
'meshcat', self.__comm_register)
if 'meshcat' in self.__kernel.comm_manager.targets:
self.__kernel.comm_manager.unregister_target(
'meshcat', self.__comm_register)
self.__comm_stream.close(linger=5)
self.__comm_socket.close(linger=5)
self.__ioloop.add_callback(lambda: self.__ioloop.stop())
self.__thread.join()
self.__thread = None

def __forward_to_ipython(self, frames: Sequence[bytes]) -> None:
def __forward_to_ipykernel(self, frames: Sequence[bytes]) -> None:
comm_id, cmd = frames # There must be always two parts each messages
comm_id = comm_id.decode()
try:
comm = self.__kernel.comm_manager.comms[comm_id]
comm.send(buffers=[cmd])
except KeyError:
# The comm has probably been closed without the server knowing.
# Sending the notification to the server to consider it as such.
self.__comm_socket.send(f"close:{comm_id}".encode())
else:
comm.send(buffers=[cmd])

def __comm_register(self,
comm: 'ipykernel.comm.Comm', # noqa
Expand All @@ -240,7 +248,7 @@ def __comm_register(self,
# mechanism: if the main thread is already busy for some reason, for
# instance waiting for a reply from the server ZMQ socket, then
# `comm.on_msg` will NOT triggered automatically. It is only triggered
# automatically once every other tacks has been process. The workaround
# automatically once every other tasks has been process. The workaround
# is to interleave blocking code with call of `kernel.do_one_iteration`
# or `await kernel.process_one(wait=True)`. See Stackoverflow for ref:
# https://stackoverflow.com/a/63666477/4820605
Expand Down
41 changes: 28 additions & 13 deletions python/jiminy_py/src/jiminy_py/viewer/viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import multiprocessing
import xml.etree.ElementTree as ET
from copy import deepcopy
from urllib.parse import urlparse
from urllib.request import urlopen
from functools import wraps, partial
from bisect import bisect_right
Expand Down Expand Up @@ -52,7 +51,7 @@
Panda3dVisualizer)


DISPLAY_FRAMERATE = 30
REPLAY_FRAMERATE = 40

CAMERA_INV_TRANSFORM_PANDA3D = rpyToMatrix(-np.pi/2, 0.0, 0.0)
CAMERA_INV_TRANSFORM_MESHCAT = rpyToMatrix(-np.pi/2, 0.0, 0.0)
Expand Down Expand Up @@ -851,10 +850,26 @@ def open_gui(start_if_needed: bool = False) -> bool:

# Provide websocket URL as fallback if needed. It would be
# the case if the environment is not jupyter-notebook nor
# colab but rather japyterlab or vscode for instance.
web_url = f"ws://{urlparse(viewer_url).netloc}"
# colab but rather jupyterlab or vscode for instance.
from IPython import get_ipython
from notebook import notebookapp
kernel = get_ipython().kernel
conn_file = kernel.config['IPKernelApp']['connection_file']
kernel_id = conn_file.split('-', 1)[1].split('.')[0]
server_pid = psutil.Process(os.getpid()).parent().pid
server_list = list(notebookapp.list_running_servers())
try:
from jupyter_server import serverapp
server_list += list(serverapp.list_running_servers())
except ImportError:
pass
for server_info in server_list:
if server_info['pid'] == server_pid:
break
ws_path = (f"{server_info['base_url']}api/kernels/{kernel_id}"
f"/channels?token={server_info['token']}")
html_content = html_content.replace(
"var ws_url = undefined;", f'var ws_url = "{web_url}";')
"var ws_path = undefined;", f'var ws_path = "{ws_path}";')

if interactive_mode() == 1:
# Embed HTML in iframe on Jupyter, since it is not
Expand Down Expand Up @@ -905,8 +920,8 @@ def has_gui() -> bool:
comm_manager = Viewer._backend_obj.comm_manager
if comm_manager is not None:
ack = Viewer._backend_obj.wait(require_client=False)
Viewer._has_gui = any([
msg == "meshcat:ok" for msg in ack.split(",")])
Viewer._has_gui = any(
msg == "ok" for msg in ack.split(","))
return Viewer._has_gui
return False

Expand Down Expand Up @@ -1223,7 +1238,7 @@ def _gepetto_client_connect(get_proc_info=False):
open_gui = True

# List of connections likely to correspond to Meshcat servers
meshcat_candidate_conn = []
meshcat_candidate_conn = {}
for pid in psutil.pids():
try:
proc = psutil.Process(pid)
Expand All @@ -1234,7 +1249,7 @@ def _gepetto_client_connect(get_proc_info=False):
cmdline = proc.cmdline()
if cmdline and ('python' in cmdline[0].lower() or
'meshcat' in cmdline[-1]):
meshcat_candidate_conn.append(conn)
meshcat_candidate_conn[pid] = conn
except (psutil.AccessDenied,
psutil.ZombieProcess,
psutil.NoSuchProcess):
Expand All @@ -1254,7 +1269,7 @@ def _gepetto_client_connect(get_proc_info=False):
# Use the first port responding to zmq request, if any
zmq_url = None
context = zmq.Context.instance()
for conn in meshcat_candidate_conn:
for pid, conn in meshcat_candidate_conn.items():
try:
# Note that the timeout must be long enough to give enough
# time to the server to respond, but not to long to avoid
Expand All @@ -1264,7 +1279,7 @@ def _gepetto_client_connect(get_proc_info=False):
continue
zmq_url = f"tcp://127.0.0.1:{port}"
zmq_socket = context.socket(zmq.REQ)
zmq_socket.RCVTIMEO = 250 # millisecond
zmq_socket.RCVTIMEO = 200 # millisecond
zmq_socket.connect(zmq_url)
zmq_socket.send(b"url")
response = zmq_socket.recv().decode("utf-8")
Expand All @@ -1285,7 +1300,7 @@ def _gepetto_client_connect(get_proc_info=False):
# Create a meshcat server if needed and connect to it
client = MeshcatWrapper(zmq_url)
if client.server_proc is None:
proc = psutil.Process(conn.pid)
proc = psutil.Process(pid)
else:
proc = client.server_proc
proc = _ProcessWrapper(proc, close_at_exit)
Expand Down Expand Up @@ -2442,7 +2457,7 @@ def replay(self,
self.display(q, v, xyz_offset, update_hook_t, wait)

# Sleep for a while if computing faster than display framerate
sleep(1.0 / DISPLAY_FRAMERATE - (time.time() - time_prev))
sleep(1.0 / REPLAY_FRAMERATE - (time.time() - time_prev))

# Update time in simulation, taking into account speed ratio
time_prev = time.time()
Expand Down

0 comments on commit 9df3afc

Please sign in to comment.