-
-
Notifications
You must be signed in to change notification settings - Fork 569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Zigbee suqare button and cube support via fake_device #709
Changes from all commits
662e35d
485da59
0f47461
a83ed86
2849a2e
d7d6367
8c23c5b
d8cae75
360658a
9df0fd0
1862910
1aba320
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
import calendar | ||
import datetime | ||
import logging | ||
import socket | ||
import struct | ||
from functools import reduce | ||
|
||
import sys | ||
import ifaddr | ||
from miio.protocol import Message | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
def ipv4_nonloop_ips(): | ||
def flatten(a, b): | ||
a.extend(b) | ||
return a | ||
|
||
return list( | ||
filter( | ||
lambda ip: isinstance(ip, str) and not ip.startswith("127"), | ||
map( | ||
lambda ip: ip.ip, | ||
reduce( | ||
flatten, map(lambda adapter: adapter.ips, ifaddr.get_adapters()), [] | ||
), | ||
), | ||
) | ||
) | ||
|
||
|
||
class FakeDevice: | ||
_device_id = None | ||
_address = None | ||
_token = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no need to define these as class attributes. |
||
|
||
def __init__(self, device_id, token, address="0.0.0.0"): | ||
self._device_id = device_id | ||
self._token = token | ||
self._address = address | ||
|
||
def run(self, callback): | ||
def build_ack(device: int): | ||
# Original devices are using year 1970, but it seems current datetime is fine | ||
timestamp = calendar.timegm(datetime.datetime.now().timetuple()) | ||
# ACK packet not signed, 16 bytes header + 16 bytes of zeroes | ||
return struct.pack(">HHIII16s", 0x2131, 32, 0, device, timestamp, bytes(16)) | ||
|
||
helobytes = bytes.fromhex( | ||
"21310020ffffffffffffffffffffffffffffffffffffffffffffffffffffffff" | ||
) | ||
|
||
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) | ||
# Gateway interacts only with port 54321 | ||
sock.bind((self._address, 54321)) | ||
_LOGGER.info( | ||
"fake miio device started with address=%s device_id=%s callback=%s token=****", | ||
self._address, | ||
self._device_id, | ||
callback, | ||
) | ||
|
||
while True: | ||
data, [host, port] = sock.recvfrom(1024) | ||
if data == helobytes: | ||
_LOGGER.debug("%s:%s=>PING", host, port) | ||
m = build_ack(self._device_id) | ||
sock.sendto(m, (host, port)) | ||
_LOGGER.debug("%s:%s<=ACK(device_id=%s)", host, port, self._device_id) | ||
else: | ||
request = Message.parse(data, token=self._token) | ||
value = request.data.value | ||
_LOGGER.debug("%s:%s=>%s", host, port, value) | ||
action, device_call_id = value["method"].split("_") | ||
source_device_id = ( | ||
f"lumi.{device_call_id}" # All known devices use lumi. prefix | ||
) | ||
callback(source_device_id, action, value["params"]) | ||
# This result means OK, but some methods return ['ok'] instead of 0 | ||
# might be necessary to use different results for different methods | ||
result = {"result": 0, "id": value["id"]} | ||
header = { | ||
"length": 0, | ||
"unknown": 0, | ||
"device_id": self._device_id, | ||
"ts": datetime.datetime.now(), | ||
} | ||
msg = { | ||
"data": {"value": result}, | ||
"header": {"value": header}, | ||
"checksum": 0, | ||
} | ||
response = Message.build(msg, token=self._token) | ||
_LOGGER.debug("%s:%s<=%s", host, port, result) | ||
sock.sendto(response, (host, port)) | ||
|
||
|
||
if __name__ == "__main__": | ||
|
||
def callback(source_device, action, params): | ||
_LOGGER.debug(f"CALLBACK {source_device}=>{action}({params})") | ||
|
||
from gateway_scripts import fake_device_id | ||
|
||
logging.basicConfig(level="DEBUG") | ||
|
||
device_id = int(fake_device_id) | ||
# Use real token on fake device for encryption | ||
# encoded is used in scripts = key pair should match | ||
# tokens = { | ||
# "real": "9bc7c7ce6291d3e443fd7708608b9892", | ||
# "encoded": "79cf21b08fb051499389f23c113477a4", | ||
# } | ||
if len(sys.argv) > 1: | ||
device_token = bytes.fromhex(sys.argv[1]) | ||
else: | ||
print( | ||
"WARNING kae device starting with publically known token! Pass other token next time pls..." | ||
) | ||
device_token = bytes.fromhex("9bc7c7ce6291d3e443fd7708608b9892") | ||
fake_device = FakeDevice(device_id, device_token) | ||
fake_device.run(callback) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,13 +4,28 @@ | |
from datetime import datetime | ||
from enum import Enum, IntEnum | ||
from typing import Optional | ||
from random import randrange | ||
|
||
import attr | ||
import click | ||
|
||
from .click_common import EnumType, command, format_output | ||
from .device import Device | ||
from .exceptions import DeviceException | ||
from .fake_device import ipv4_nonloop_ips | ||
from .gateway_scripts import ( | ||
action_id, | ||
build_doublepress, | ||
build_flip90, | ||
build_flip180, | ||
build_longpress, | ||
build_move, | ||
build_rotate, | ||
build_shake, | ||
build_shakeair, | ||
build_singlepress, | ||
build_taptap, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is waaaaay too specific to be inside the gateway class. We need a way to expose subdevice based actions some other way. |
||
) | ||
from .utils import brightness_and_color_to_int, int_to_brightness, int_to_rgb | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
@@ -166,6 +181,8 @@ def discover_devices(self): | |
DeviceType.Plug: AqaraPlug, | ||
DeviceType.SensorHT: SensorHT, | ||
DeviceType.AqaraHT: AqaraHT, | ||
DeviceType.Cube: Cube, | ||
DeviceType.AqaraSquareButton: AqaraSquareButton, | ||
DeviceType.AqaraMagnet: AqaraMagnet, | ||
DeviceType.AqaraSwitchOneChannel: AqaraSwitchOneChannel, | ||
DeviceType.AqaraSwitchTwoChannels: AqaraSwitchTwoChannels, | ||
|
@@ -207,6 +224,47 @@ def discover_devices(self): | |
|
||
return self._devices | ||
|
||
def x_del(self, script_id): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would rename "x_del" to "unsubscribe_event" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. x_del is name of method... python method name mimics MIIO method... Less conversions => more comprehension... x_del is not executed directly, it's low-level method which is called from uninstall ... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. alright I understand, you are probably right if this function is only called by other functions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that this should be named otherwise, especially if this can be useful for other use cases. Otherwise it is probably simpler to simply use |
||
"""Delete script by id.""" | ||
return self.send("miIO.xdel", [script_id]) | ||
|
||
@command( | ||
click.argument("sid"), | ||
click.argument("command"), | ||
click.argument("encoded_token"), | ||
) | ||
def subdevice_command(self, sid, command, encoded_token): | ||
"""Send command to subdevice.""" | ||
self.discover_devices() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it necessary to do the discovery for this? If yes, I think it would be better to modify the discover_devices to return a dictionary of |
||
target = list(filter(lambda subdevice: subdevice.sid == sid, self.devices)) | ||
if len(target) < 1: | ||
return f"Device with sid={sid} not found" | ||
elif not hasattr(target[0], command): | ||
return f"Device with sid={sid} has no method {command}" | ||
else: | ||
return getattr(target[0], command)(encoded_token) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Usage of |
||
|
||
def install_script(self, sid, builder, encoded_token, ip=None): | ||
"""Install script for by building script source and sending it with miio method. You need to run fake or real device to capture script execution results.""" | ||
if ip is None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to see this being much simpler, in my opinion the API should be more something like:
Likewise, if I have a cube instance, I would like to query the list of available events and already installed scripts:
|
||
addresses = ipv4_nonloop_ips() | ||
my_ip = addresses[0] # Taking first public IP | ||
else: | ||
my_ip = ip | ||
|
||
data_tkn = randrange(5000, 10000) | ||
source = builder(sid, my_ip, encoded_token) | ||
return self.send( | ||
"send_data_frame", | ||
{ | ||
"cur": 0, | ||
"data": source, | ||
"data_tkn": data_tkn, | ||
"total": 1, | ||
"type": "scene", | ||
}, | ||
) | ||
|
||
@command(click.argument("property")) | ||
def get_prop(self, property): | ||
"""Get the value of a property for given sid.""" | ||
|
@@ -744,6 +802,21 @@ def get_firmware_version(self) -> Optional[int]: | |
) | ||
return self._fw_ver | ||
|
||
@command() | ||
def uninstall_scripts(self, encoded_token): | ||
return dict( | ||
map( | ||
lambda action: ( | ||
action, | ||
( | ||
action_id[action](self.sid), | ||
self._gw.x_del(action_id[action](self.sid)), | ||
), | ||
), | ||
action_id.keys(), | ||
) | ||
) | ||
Comment on lines
+806
to
+818
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is way coo complicated and rather hard to interpret. Is the goal to remove all scripts? Is there a way to get a list of current events, that could be iterated and then removed? How does removal/overwriting of a single script look like? How is the encoded token used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes... goal is to remove all scripts at least potentially could be installed in device... As far as i get list of installed scripts is not exposed through miio protocol... Here I xdel all scripts for all possible scripts to avoid memory leak on gw Encoded token is used during script installation and somehow linked with real token. Unfortunately algo is unknown... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I think there were some sort of list functionality related to scenes in the gateway implementation at some point, that could be related to this? Anyway, I pointed out the encoded_token here as it is given as a parameter but never used in the method itself? |
||
|
||
|
||
class AqaraHT(SubDevice): | ||
"""Subdevice AqaraHT specific properties and methods""" | ||
|
@@ -928,6 +1001,67 @@ def update(self): | |
self._props.status_ch1 = values[1] | ||
self._props.load_power = values[2] | ||
|
||
class Cube(SubDevice): | ||
"""Subdevice Cube specific properties and methods""" | ||
|
||
properties = [] | ||
|
||
@command() | ||
def install_move_script(self, encoded_token): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All these installs should probably be just a single method in The subdevices themselves should have a class attribute defining the available events. So in the case of the cube, the Cube class should define that it has events:
The users of the lib can then request available events, and create a script for those. |
||
"""Generate and install script which captures move event and sends miio package to device""" | ||
return self._gw.install_script(self.sid, build_move, encoded_token) | ||
|
||
@command() | ||
def install_rotate_script(self, encoded_token): | ||
"""Generate and install script which captures rotate event and sends miio package to device""" | ||
return self._gw.install_script(self.sid, build_rotate, encoded_token) | ||
|
||
@command() | ||
def install_shake_script(self, encoded_token): | ||
"""Generate and install script which captures shake in air event and sends miio package to device""" | ||
return self._gw.install_script(self.sid, build_shakeair, encoded_token) | ||
|
||
@command() | ||
def install_flip90_script(self, encoded_token): | ||
"""Generate and install script which captures horizontal 90 flip and sends miio package to device""" | ||
return self._gw.install_script(self.sid, build_flip90, encoded_token) | ||
|
||
@command() | ||
def install_taptap_script(self, encoded_token): | ||
"""Generate and install script which captures double tap on surface event and sends miio package to device""" | ||
return self._gw.install_script(self.sid, build_taptap, encoded_token) | ||
|
||
@command() | ||
def install_flip180_script(self, encoded_token): | ||
"""Generate and install script which captures horizontal 180 flip and sends miio package to device""" | ||
return self._gw.install_script(self.sid, build_flip180, encoded_token) | ||
|
||
|
||
class AqaraSquareButton(SubDevice): | ||
"""Subdevice AqaraSquareButton specific properties and methods""" | ||
|
||
properties = [] | ||
|
||
@command() | ||
def install_singlepress_script(self, encoded_token): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my other comment about how to expose events to users. |
||
"""Generate and install script which captures single press event and sends miio package to device""" | ||
return self._gw.install_script(self.sid, build_singlepress, encoded_token) | ||
|
||
@command() | ||
def install_doublepress_script(self, encoded_token): | ||
"""Generate and install script which captures double press event and sends miio package to device""" | ||
return self._gw.install_script(self.sid, build_doublepress, encoded_token) | ||
|
||
@command() | ||
def install_longpress_script(self, encoded_token): | ||
"""Generate and install script which captures loooong press event and sends miio package to device""" | ||
return self._gw.install_script(self.sid, build_longpress, encoded_token) | ||
|
||
@command() | ||
def install_shake_script(self, encoded_token): | ||
"""Generate and install script which captures shake in air event and sends miio package to device""" | ||
return self._gw.install_script(self.sid, build_shake, encoded_token) | ||
|
||
|
||
class AqaraWallOutlet(SubDevice): | ||
"""Subdevice AqaraWallOutlet specific properties and methods""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reads really, really complicated.. What for this is and is it necessary? I suppose for binding on multiple interfaces? If yes, that could also be potentially useful for discovery (iirc there is an issue about not being able to discover devices from other subnets). Is there maybe a library that could be used to achieve this?