-
Notifications
You must be signed in to change notification settings - Fork 62
/
visualizer.py
198 lines (154 loc) · 6.14 KB
/
visualizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import webbrowser
import umsgpack
import numpy as np
import zmq
import io
from PIL import Image
from IPython.display import HTML
from .path import Path
from .commands import SetObject, SetTransform, Delete, SetProperty, SetAnimation, CaptureImage
from .geometry import MeshPhongMaterial
from .servers.zmqserver import start_zmq_server_as_subprocess
class ViewerWindow:
context = zmq.Context()
def __init__(self, zmq_url, start_server, server_args):
if start_server:
self.server_proc, self.zmq_url, self.web_url = start_zmq_server_as_subprocess(
zmq_url=zmq_url, server_args=server_args)
else:
self.server_proc = None
self.zmq_url = zmq_url
self.connect_zmq()
if not start_server:
self.web_url = self.request_web_url()
# Not sure why this is necessary, but requesting the web URL before
# the websocket connection is made seems to break the receiver
# callback in the server until we reconnect.
self.connect_zmq()
print("You can open the visualizer by visiting the following URL:")
print(self.web_url)
def connect_zmq(self):
self.zmq_socket = self.context.socket(zmq.REQ)
self.zmq_socket.connect(self.zmq_url)
def request_web_url(self):
self.zmq_socket.send(b"url")
response = self.zmq_socket.recv().decode("utf-8")
return response
def open(self):
webbrowser.open(self.web_url, new=2)
return self
def wait(self):
self.zmq_socket.send(b"wait")
return self.zmq_socket.recv().decode("utf-8")
def send(self, command):
cmd_data = command.lower()
self.zmq_socket.send_multipart([
cmd_data["type"].encode("utf-8"),
cmd_data["path"].encode("utf-8"),
umsgpack.packb(cmd_data)
])
self.zmq_socket.recv()
def get_scene(self):
"""Get the static HTML from the ZMQ server."""
self.zmq_socket.send(b"get_scene")
# we receive the HTML as utf-8-encoded, so decode here
return self.zmq_socket.recv().decode('utf-8')
def get_image(self):
cmd_data = CaptureImage().lower()
self.zmq_socket.send_multipart([
cmd_data["type"].encode("utf-8"),
"".encode("utf-8"),
umsgpack.packb(cmd_data)
])
img_bytes = self.zmq_socket.recv()
img = Image.open(io.BytesIO(img_bytes))
return img
def srcdoc_escape(x):
return x.replace("&", "&").replace('"', """)
class Visualizer:
__slots__ = ["window", "path"]
def __init__(self, zmq_url=None, window=None, server_args=[]):
if window is None:
self.window = ViewerWindow(zmq_url=zmq_url, start_server=(zmq_url is None), server_args=server_args)
else:
self.window = window
self.path = Path(("meshcat",))
@staticmethod
def view_into(window, path):
vis = Visualizer(window=window)
vis.path = path
return vis
def open(self):
self.window.open()
return self
def url(self):
return self.window.web_url
def wait(self):
"""
Block until a browser is connected to the server
"""
return self.window.wait()
def jupyter_cell(self, height=400):
"""
Render the visualizer in a jupyter notebook or jupyterlab cell.
For this to work, it should be the very last command in the given jupyter
cell.
"""
return HTML("""
<div style="height: {height}px; width: 100%; overflow-x: auto; overflow-y: hidden; resize: both">
<iframe src="{url}" style="width: 100%; height: 100%; border: none"></iframe>
</div>
""".format(url=self.url(), height=height))
def render_static(self, height=400):
"""
Render a static snapshot of the visualizer in a jupyter notebook or
jupyterlab cell. The resulting snapshot of the visualizer will still be an
interactive 3D scene, but it won't be affected by any future `set_transform`
or `set_object` calls.
Note: this method should work well even when your jupyter kernel is running
on a different machine or inside a container.
"""
return HTML("""
<div style="height: {height}px; width: 100%; overflow-x: auto; overflow-y: hidden; resize: both">
<iframe srcdoc="{srcdoc}" style="width: 100%; height: 100%; border: none"></iframe>
</div>
""".format(srcdoc=srcdoc_escape(self.static_html()), height=height))
def __getitem__(self, path):
return Visualizer.view_into(self.window, self.path.append(path))
def set_object(self, geometry, material=None):
return self.window.send(SetObject(geometry, material, self.path))
def set_transform(self, matrix=np.eye(4)):
return self.window.send(SetTransform(matrix, self.path))
def set_property(self, key, value):
return self.window.send(SetProperty(key, value, self.path))
def set_animation(self, animation, play=True, repetitions=1):
return self.window.send(SetAnimation(animation, play=play, repetitions=repetitions))
def get_image(self):
"""Save an image"""
return self.window.get_image()
def delete(self):
return self.window.send(Delete(self.path))
def close(self):
self.window.close()
def static_html(self):
"""
Generate and save a static HTML file that standalone encompasses the visualizer and contents.
Ask the server for the scene (since the server knows it), and pack it all into an
HTML blob for future use.
"""
return self.window.get_scene()
def __repr__(self):
return "<Visualizer using: {window} at path: {path}>".format(window=self.window, path=self.path)
if __name__ == '__main__':
import time
import sys
args = []
if len(sys.argv) > 1:
zmq_url = sys.argv[1]
if len(sys.argv) > 2:
args = sys.argv[2:]
else:
zmq_url = None
window = ViewerWindow(zmq_url, zmq_url is None, True, args)
while True:
time.sleep(100)