-
-
Notifications
You must be signed in to change notification settings - Fork 568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Zigbee suqare button and cube support via fake_device #709
Changes from 4 commits
662e35d
485da59
0f47461
a83ed86
2849a2e
d7d6367
8c23c5b
d8cae75
360658a
9df0fd0
1862910
1aba320
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
import calendar | ||
import datetime | ||
import logging | ||
import socket | ||
import struct | ||
from functools import reduce | ||
|
||
import ifaddr | ||
from miio.protocol import Message | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
def ipv4_nonloop_ips(): | ||
def flatten(a, b): | ||
a.extend(b) | ||
return a | ||
|
||
return list( | ||
filter( | ||
lambda ip: isinstance(ip, str) and not ip.startswith("127"), | ||
map( | ||
lambda ip: ip.ip, | ||
reduce( | ||
flatten, map(lambda adapter: adapter.ips, ifaddr.get_adapters()), [] | ||
), | ||
), | ||
) | ||
) | ||
|
||
|
||
class FakeDevice: | ||
_device_id = None | ||
_address = None | ||
_token = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no need to define these as class attributes. |
||
|
||
def __init__(self, device_id, token, address="0.0.0.0"): | ||
self._device_id = device_id | ||
self._token = token | ||
self._address = address | ||
|
||
def run(self, callback): | ||
def build_ack(device: int): | ||
# Original devices are using year 1970, but it seems current datetime is fine | ||
timestamp = calendar.timegm(datetime.datetime.now().timetuple()) | ||
# ACK packet not signed, 16 bytes header + 16 bytes of zeroes | ||
return struct.pack(">HHIII16s", 0x2131, 32, 0, device, timestamp, bytes(16)) | ||
|
||
helobytes = bytes.fromhex( | ||
"21310020ffffffffffffffffffffffffffffffffffffffffffffffffffffffff" | ||
) | ||
|
||
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) | ||
# Gateway interacts only with port 54321 | ||
sock.bind((self._address, 54321)) | ||
_LOGGER.info( | ||
"fake miio device started with address=%s device_id=%s callback=%s token=****", | ||
self._address, | ||
self._device_id, | ||
callback, | ||
) | ||
|
||
while True: | ||
data, [host, port] = sock.recvfrom(1024) | ||
if data == helobytes: | ||
_LOGGER.debug("%s:%s=>PING", host, port) | ||
m = build_ack(self._device_id) | ||
sock.sendto(m, (host, port)) | ||
_LOGGER.debug("%s:%s<=ACK(device_id=%s)", host, port, self._device_id) | ||
else: | ||
request = Message.parse(data, token=self._token) | ||
value = request.data.value | ||
_LOGGER.debug("%s:%s=>%s", host, port, value) | ||
action, device_call_id = value["method"].split("_") | ||
source_device_id = ( | ||
f"lumi.{device_call_id}" # All known devices use lumi. prefix | ||
) | ||
callback(source_device_id, action, value["params"]) | ||
# This result means OK, but some methods return ['ok'] instead of 0 | ||
# might be necessary to use different results for different methods | ||
result = {"result": 0, "id": value["id"]} | ||
header = { | ||
"length": 0, | ||
"unknown": 0, | ||
"device_id": self._device_id, | ||
"ts": datetime.datetime.now(), | ||
} | ||
msg = { | ||
"data": {"value": result}, | ||
"header": {"value": header}, | ||
"checksum": 0, | ||
} | ||
response = Message.build(msg, token=self._token) | ||
_LOGGER.debug("%s:%s<=%s", host, port, result) | ||
sock.sendto(response, (host, port)) | ||
|
||
|
||
if __name__ == "__main__": | ||
|
||
def callback(source_device, action, params): | ||
_LOGGER.debug(f"CALLBACK {source_device}=>{action}({params})") | ||
|
||
from gateway_scripts import tokens, fake_device_id | ||
|
||
logging.basicConfig(level="DEBUG") | ||
|
||
device_id = int(fake_device_id) | ||
device_token = bytes.fromhex(tokens["real"]) | ||
fake_device = FakeDevice(device_id, device_token) | ||
fake_device.run(callback) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,21 @@ | |
from .click_common import EnumType, command, format_output | ||
from .device import Device | ||
from .exceptions import DeviceException | ||
from .fake_device import ipv4_nonloop_ips | ||
from .gateway_scripts import ( | ||
action_id, | ||
build_doublepress, | ||
build_flip90, | ||
build_flip180, | ||
build_longpress, | ||
build_move, | ||
build_rotate, | ||
build_shake, | ||
build_shakeair, | ||
build_singlepress, | ||
build_taptap, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is waaaaay too specific to be inside the gateway class. We need a way to expose subdevice based actions some other way. |
||
tokens, | ||
) | ||
from .utils import brightness_and_color_to_int, int_to_brightness, int_to_rgb | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
@@ -161,6 +176,8 @@ def discover_devices(self): | |
DeviceType.Plug: AqaraPlug, | ||
DeviceType.SensorHT: SensorHT, | ||
DeviceType.AqaraHT: AqaraHT, | ||
DeviceType.Cube: Cube, | ||
DeviceType.AqaraSquareButton: AqaraSquareButton, | ||
DeviceType.AqaraMagnet: AqaraMagnet, | ||
DeviceType.AqaraSwitchOneChannel: AqaraSwitchOneChannel, | ||
DeviceType.AqaraSwitchTwoChannels: AqaraSwitchTwoChannels, | ||
|
@@ -201,6 +218,38 @@ def discover_devices(self): | |
|
||
return self._devices | ||
|
||
def x_del(self, script_id): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would rename "x_del" to "unsubscribe_event" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. x_del is name of method... python method name mimics MIIO method... Less conversions => more comprehension... x_del is not executed directly, it's low-level method which is called from uninstall ... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. alright I understand, you are probably right if this function is only called by other functions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that this should be named otherwise, especially if this can be useful for other use cases. Otherwise it is probably simpler to simply use |
||
"""Delete script by id.""" | ||
return self.send("miIO.xdel", [script_id]) | ||
|
||
@command(click.argument("sid"), click.argument("command")) | ||
def subdevice_command(self, sid, command): | ||
self.discover_devices() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it necessary to do the discovery for this? If yes, I think it would be better to modify the discover_devices to return a dictionary of |
||
target = list(filter(lambda subdevice: subdevice.sid == sid, self.devices)) | ||
if len(target) < 1: | ||
return f"Device with sid={sid} not found" | ||
elif not hasattr(target[0], command): | ||
return f"Device with sid={sid} has no method {command}" | ||
else: | ||
return getattr(target[0], command)() | ||
|
||
def install_script(self, sid, action, builder): | ||
bskaplou marked this conversation as resolved.
Show resolved
Hide resolved
|
||
addresses = ipv4_nonloop_ips() | ||
my_ip = addresses[0] # Taking first public IP ;( | ||
_LOGGER.info("Using address %s for action %s of %s", my_ip, action, sid) | ||
data_tkn = tokens["data_tkn"] | ||
source = builder(sid, my_ip) | ||
return self.send( | ||
"send_data_frame", | ||
{ | ||
"cur": 0, | ||
"data": source, | ||
"data_tkn": data_tkn, | ||
"total": 1, | ||
"type": "scene", | ||
}, | ||
) | ||
|
||
@command(click.argument("property")) | ||
def get_prop(self, property): | ||
"""Get the value of a property for given sid.""" | ||
|
@@ -738,6 +787,21 @@ def get_firmware_version(self) -> Optional[int]: | |
) | ||
return self._fw_ver | ||
|
||
@command() | ||
def uninstall_scripts(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this uninstalls all scripts right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is actual installation of program into gateway memory.... installation and deletion... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you explain what you can do with loading such a "program" into gateway memory? It would be really nice if you could include docstrings """explanation"""" for all new functions that briefly explain what the function can do and how it works. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
docs are added for all methods There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Subscription mechanism can be implement by scripts if you use target with ip is an ip of python-miio running device... But in xiaomi scripts are not expected to be used as callbacks. It was designed to make gateway implement complete automation listen=>process=>send data to other devices... Such behavior is still possbile. You can create script which turns on wi-fi power plug in cube rotation without processing of callback in hass. |
||
return dict( | ||
map( | ||
lambda action: ( | ||
action, | ||
( | ||
action_id[action](self.sid), | ||
self._gw.x_del(action_id[action](self.sid)), | ||
), | ||
), | ||
action_id.keys(), | ||
) | ||
) | ||
|
||
|
||
class AqaraHT(SubDevice): | ||
"""Subdevice AqaraHT specific properties and methods""" | ||
|
@@ -921,3 +985,55 @@ def update(self): | |
self._props.status_ch0 = values[0] | ||
self._props.status_ch1 = values[1] | ||
self._props.load_power = values[2] | ||
|
||
|
||
class Cube(SubDevice): | ||
"""Subdevice Cube specific properties and methods""" | ||
|
||
properties = [] | ||
|
||
@command() | ||
def install_move_script(self): | ||
return self._gw.install_script(self.sid, "move", build_move) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would rename all of these to "subscribe_to_move" etc. maybe consider removing all these individual functions and just make one list "events" like the "properties" that contains all events that can be subscribed to ["move", "rotate", "shakeair", "flip90", "taptap", "flip180"] and then just make one function "subscribe_to_all_events" that subscibes to all events in a for loop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed most of code duplication... Further reducing requires deep work with click.... |
||
|
||
@command() | ||
def install_rotate_script(self): | ||
return self._gw.install_script(self.sid, "rotate", build_rotate) | ||
|
||
@command() | ||
def install_shake_script(self): | ||
return self._gw.install_script(self.sid, "shakeair", build_shakeair) | ||
|
||
@command() | ||
def install_flip90_script(self): | ||
return self._gw.install_script(self.sid, "flip90", build_flip90) | ||
|
||
@command() | ||
def install_taptap_script(self): | ||
return self._gw.install_script(self.sid, "taptap", build_taptap) | ||
|
||
@command() | ||
def install_flip180_script(self): | ||
return self._gw.install_script(self.sid, "flip180", build_flip180) | ||
|
||
|
||
class AqaraSquareButton(SubDevice): | ||
"""Subdevice AqaraSquareButton specific properties and methods""" | ||
|
||
properties = [] | ||
|
||
@command() | ||
def install_singlepress_script(self): | ||
return self._gw.install_script(self.sid, "singlepress", build_singlepress) | ||
|
||
@command() | ||
def install_doublepress_script(self): | ||
return self._gw.install_script(self.sid, "doublepress", build_doublepress) | ||
|
||
@command() | ||
def install_longpress_script(self): | ||
return self._gw.install_script(self.sid, "longpress", build_longpress) | ||
|
||
@command() | ||
def install_shake_script(self): | ||
return self._gw.install_script(self.sid, "shake", build_shake) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reads really, really complicated.. What for this is and is it necessary? I suppose for binding on multiple interfaces? If yes, that could also be potentially useful for discovery (iirc there is an issue about not being able to discover devices from other subnets). Is there maybe a library that could be used to achieve this?