-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathultimaker2mqtt.py
51 lines (43 loc) · 1.43 KB
/
ultimaker2mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import urllib.request
import time
import json
import paho.mqtt.client as mqtt
lastJob = ""
lastState = ""
lastPrinter = ""
client = mqtt.Client()
client.connect_async("mqtt.rzl.so")
client.loop_start()
while True:
job = '{"state": "none"}'
printer = '{"status": "unreachable"}'
try:
with urllib.request.urlopen("http://ultimaker.rzl.so/api/v1/printer", timeout=10) as response:
printer = response.read().decode('utf-8')
with urllib.request.urlopen("http://ultimaker.rzl.so/api/v1/print_job", timeout=10) as response:
job = response.read().decode('utf-8')
except Exception as e:
print(f"Error: {e}")
state = ""
try:
stateJob = json.loads(job)['state']
statePrinter = json.loads(printer)['status']
if stateJob == "none":
state = statePrinter
else:
state = stateJob
except Exception as e:
state = "error"
print(f"Error parsing JSON: {e}")
print(job)
print(state)
if state != lastState:
client.publish("/service/ultimaker/state", payload=state, qos=0, retain=True)
lastState = state
if job != lastJob:
client.publish("/service/ultimaker/job", payload=job, qos=0, retain=True)
lastJob = job
if printer != lastPrinter:
client.publish("/service/ultimaker/printer", payload=printer, qos=0, retain=True)
lastPrinter = printer
time.sleep(10)