-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
executable file
·40 lines (31 loc) · 1.03 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import io
import tornado.web
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.websocket import WebSocketHandler
from picamera import PiCamera
camera = PiCamera()
camera.rotation = 180
camera.resolution = (300, 300)
class WebSocket(tornado.websocket.WebSocketHandler):
def open(self):
print("[WS]: opened")
self.camera_loop = PeriodicCallback(self.loop, 100)
self.camera_loop.start()
def on_message(self, message):
print(f'[WS]: received message: {message}')
def on_close(self):
print("[WS]: closed")
def check_origin(self, origin):
return True
def loop(self):
sio = io.BytesIO()
camera.capture(sio, "jpeg", use_video_port=True)
try:
self.write_message(sio.getvalue(), binary=True)
except tornado.websocket.WebSocketClosedError:
self.camera_loop.stop()
handlers = [(r"/", WebSocket)]
app = tornado.web.Application(handlers)
app.listen(8080)
print('[Server]: Running ...')
IOLoop.current().start()