-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathbinds.py
181 lines (157 loc) · 5.53 KB
/
binds.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import logging
from zigpy.zdo.types import ZDOCmd
LOGGER = logging.getLogger(__name__)
async def bind_group(app, listener, ieee, cmd, data, service):
from zigpy.zdo.types import MultiAddress
from zigpy import types as t
LOGGER.debug("running 'bind group' command: %s", service)
if ieee is None:
LOGGER.error("missing ieee")
return
src_dev = app.get_device(ieee=ieee)
if not data:
LOGGER.error("missing cmd_data")
return
group_id = int(data, base=16)
zdo = src_dev.zdo
src_cls = [6, 8, 768]
# find src ep_id
dst_addr = MultiAddress()
dst_addr.addrmode = t.uint8_t(1)
dst_addr.nwk = t.uint16_t(group_id)
for src_cluster in src_cls:
src_epid = None
for ep_id, ep in src_dev.endpoints.items():
if ep_id == 0:
continue
if src_cluster in ep.out_clusters:
src_epid = ep_id
break
if not src_epid:
LOGGER.debug(
"0x%04x: skipping %s cluster as non present", src_dev.nwk, src_cluster
)
continue
LOGGER.debug(
"0x%04x: binding %s, ep: %s, cluster: %s",
src_dev.nwk,
str(src_dev.ieee),
src_epid,
src_cluster,
)
res = await zdo.request(
ZDOCmd.Bind_req, src_dev.ieee, src_epid, src_cluster, dst_addr
)
LOGGER.debug("0x%04x: binding group 0x%04x: %s", src_dev.nwk, group_id, res)
async def unbind_group(app, listener, ieee, cmd, data, service):
from zigpy.zdo.types import MultiAddress
from zigpy import types as t
LOGGER.debug("running 'unbind group' command: %s", service)
if ieee is None or not data:
LOGGER.error("missing ieee")
return
src_dev = app.get_device(ieee=ieee)
group_id = int(data, base=16)
zdo = src_dev.zdo
src_cls = [6, 8, 768]
dst_addr = MultiAddress()
dst_addr.addrmode = t.uint8_t(1)
dst_addr.nwk = t.uint16_t(group_id)
for src_cluster in src_cls:
src_ep = None
for ep_id, ep in src_dev.endpoints.items():
if ep_id == 0:
continue
if src_cluster in ep.out_clusters:
src_ep = ep_id
break
if not src_ep:
LOGGER.debug(
"0x%04x: skipping %s cluster as non present", src_dev.nwk, src_cluster
)
continue
LOGGER.debug(
"0x%04x: unbinding %s, ep: %s, cluster: %s",
src_dev.nwk,
str(src_dev.ieee),
src_ep,
src_cluster,
)
res = await zdo.request(
ZDOCmd.Unbind_req, src_dev.ieee, src_ep, src_cluster, dst_addr
)
LOGGER.debug("0x%04x: unbinding group 0x%04x: %s", src_dev.nwk, group_id, res)
async def bind_ieee(app, listener, ieee, cmd, data, service):
from zigpy import types as t
from zigpy.zdo.types import MultiAddress
if ieee is None or not data:
LOGGER.error("missing ieee")
return
LOGGER.debug("running 'bind ieee' command: %s", service)
src_dev = app.get_device(ieee=ieee)
dst_ieee = t.EUI64([t.uint8_t(p, base=16) for p in data.split(":")])
dst_dev = app.get_device(ieee=dst_ieee)
zdo = src_dev.zdo
src_clusters = [6, 8, 768]
for src_cluster in src_clusters:
src_endpoints = [
ep_id
for ep_id, ep in src_dev.endpoints.items()
if ep_id != 0 and src_cluster in ep.out_clusters
]
LOGGER.debug(
"0x%04x: got the %s endpoints for %s cluster",
src_dev.nwk,
src_endpoints,
src_cluster,
)
if not src_endpoints:
LOGGER.debug(
"0x%04x: skipping %s cluster as non present", src_dev.nwk, src_cluster
)
continue
dst_addr = MultiAddress()
dst_addr.addrmode = t.uint8_t(3)
dst_addr.ieee = dst_dev.ieee
# find dest ep
dst_epid = None
for ep_id, ep in dst_dev.endpoints.items():
if ep_id == 0:
continue
if src_cluster in ep.in_clusters:
dst_epid = ep_id
break
if not dst_epid:
continue
dst_addr.endpoint = t.uint8_t(dst_epid)
for src_ep in src_endpoints:
LOGGER.debug(
"0x%04x: binding %s, ep: %s, cluster: %s to %s dev %s ep",
src_dev.nwk,
str(src_dev.ieee),
src_ep,
src_cluster,
str(dst_dev.ieee),
dst_epid,
)
res = await zdo.request(
ZDOCmd.Bind_req, src_dev.ieee, src_ep, src_cluster, dst_addr
)
LOGGER.debug(
"0x%04x: binding ieee %s: %s", src_dev.nwk, str(dst_dev.ieee), res
)
async def unbind_coordinator(app, listener, ieee, cmd, data, service):
LOGGER.debug("running 'unbind coordinator' command: %s", service)
if ieee is None or not data:
LOGGER.error("missing ieee and/or data")
return
src_dev = app.get_device(ieee=ieee)
cluster_id = int(data)
for ep_id, ep in src_dev.endpoints.items():
if not ep_id or cluster_id not in ep.out_clusters:
continue
LOGGER.debug(
"0x%04x: unbinding ep: %s, cluster: %s", src_dev.nwk, ep_id, cluster_id
)
res = await ep.out_clusters[cluster_id].unbind()
LOGGER.debug("0x%04x: unbinding 0x%04x: %s", src_dev.nwk, cluster_id, res)