Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zigbee suqare button and cube support via fake_device #709

Closed
wants to merge 12 commits into from
110 changes: 110 additions & 0 deletions miio/fake_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import calendar
import datetime
import logging
import socket
import struct
from functools import reduce

import ifaddr
from miio.protocol import Message

_LOGGER = logging.getLogger(__name__)


def ipv4_nonloop_ips():
def flatten(a, b):
a.extend(b)
return a

return list(
filter(
lambda ip: isinstance(ip, str) and not ip.startswith("127"),
map(
lambda ip: ip.ip,
reduce(
flatten, map(lambda adapter: adapter.ips, ifaddr.get_adapters()), []
),
),
)
)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads really, really complicated.. What for this is and is it necessary? I suppose for binding on multiple interfaces? If yes, that could also be potentially useful for discovery (iirc there is an issue about not being able to discover devices from other subnets). Is there maybe a library that could be used to achieve this?



class FakeDevice:
_device_id = None
_address = None
_token = None
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to define these as class attributes.


def __init__(self, device_id, token, address="0.0.0.0"):
self._device_id = device_id
self._token = token
self._address = address

def run(self, callback):
def build_ack(device: int):
# Original devices are using year 1970, but it seems current datetime is fine
timestamp = calendar.timegm(datetime.datetime.now().timetuple())
# ACK packet not signed, 16 bytes header + 16 bytes of zeroes
return struct.pack(">HHIII16s", 0x2131, 32, 0, device, timestamp, bytes(16))

helobytes = bytes.fromhex(
"21310020ffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
)

sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
# Gateway interacts only with port 54321
sock.bind((self._address, 54321))
_LOGGER.info(
"fake miio device started with address=%s device_id=%s callback=%s token=****",
self._address,
self._device_id,
callback,
)

while True:
data, [host, port] = sock.recvfrom(1024)
if data == helobytes:
_LOGGER.debug("%s:%s=>PING", host, port)
m = build_ack(self._device_id)
sock.sendto(m, (host, port))
_LOGGER.debug("%s:%s<=ACK(device_id=%s)", host, port, self._device_id)
else:
request = Message.parse(data, token=self._token)
value = request.data.value
_LOGGER.debug("%s:%s=>%s", host, port, value)
action, device_call_id = value["method"].split("_")
source_device_id = (
f"lumi.{device_call_id}" # All known devices use lumi. prefix
)
callback(source_device_id, action, value["params"])
# This result means OK, but some methods return ['ok'] instead of 0
# might be necessary to use different results for different methods
result = {"result": 0, "id": value["id"]}
header = {
"length": 0,
"unknown": 0,
"device_id": self._device_id,
"ts": datetime.datetime.now(),
}
msg = {
"data": {"value": result},
"header": {"value": header},
"checksum": 0,
}
response = Message.build(msg, token=self._token)
_LOGGER.debug("%s:%s<=%s", host, port, result)
sock.sendto(response, (host, port))


if __name__ == "__main__":

def callback(source_device, action, params):
_LOGGER.debug(f"CALLBACK {source_device}=>{action}({params})")

from gateway_scripts import tokens, fake_device_id

logging.basicConfig(level="DEBUG")

device_id = int(fake_device_id)
device_token = bytes.fromhex(tokens["real"])
fake_device = FakeDevice(device_id, device_token)
fake_device.run(callback)
116 changes: 116 additions & 0 deletions miio/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@
from .click_common import EnumType, command, format_output
from .device import Device
from .exceptions import DeviceException
from .fake_device import ipv4_nonloop_ips
from .gateway_scripts import (
action_id,
build_doublepress,
build_flip90,
build_flip180,
build_longpress,
build_move,
build_rotate,
build_shake,
build_shakeair,
build_singlepress,
build_taptap,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is waaaaay too specific to be inside the gateway class. We need a way to expose subdevice based actions some other way.

tokens,
)
from .utils import brightness_and_color_to_int, int_to_brightness, int_to_rgb

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -161,6 +176,8 @@ def discover_devices(self):
DeviceType.Plug: AqaraPlug,
DeviceType.SensorHT: SensorHT,
DeviceType.AqaraHT: AqaraHT,
DeviceType.Cube: Cube,
DeviceType.AqaraSquareButton: AqaraSquareButton,
DeviceType.AqaraMagnet: AqaraMagnet,
DeviceType.AqaraSwitchOneChannel: AqaraSwitchOneChannel,
DeviceType.AqaraSwitchTwoChannels: AqaraSwitchTwoChannels,
Expand Down Expand Up @@ -201,6 +218,38 @@ def discover_devices(self):

return self._devices

def x_del(self, script_id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename "x_del" to "unsubscribe_event"
and "script_id" to "subscription_id"

Copy link
Contributor Author

@bskaplou bskaplou May 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

x_del is name of method... python method name mimics MIIO method... Less conversions => more comprehension... x_del is not executed directly, it's low-level method which is called from uninstall ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright I understand, you are probably right if this function is only called by other functions

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this should be named otherwise, especially if this can be useful for other use cases. Otherwise it is probably simpler to simply use self.send where necessary (or at least rename this to something more descriptive, and prefix it with _ to indicate that it's a private method not to be used by others.

"""Delete script by id."""
return self.send("miIO.xdel", [script_id])

@command(click.argument("sid"), click.argument("command"))
def subdevice_command(self, sid, command):
self.discover_devices()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to do the discovery for this? If yes, I think it would be better to modify the discover_devices to return a dictionary of {sid: SubDevice}. This would allow dropping the next line and make it much more easier to comprehend.

target = list(filter(lambda subdevice: subdevice.sid == sid, self.devices))
if len(target) < 1:
return f"Device with sid={sid} not found"
elif not hasattr(target[0], command):
return f"Device with sid={sid} has no method {command}"
else:
return getattr(target[0], command)()

def install_script(self, sid, action, builder):
bskaplou marked this conversation as resolved.
Show resolved Hide resolved
addresses = ipv4_nonloop_ips()
my_ip = addresses[0] # Taking first public IP ;(
_LOGGER.info("Using address %s for action %s of %s", my_ip, action, sid)
data_tkn = tokens["data_tkn"]
source = builder(sid, my_ip)
return self.send(
"send_data_frame",
{
"cur": 0,
"data": source,
"data_tkn": data_tkn,
"total": 1,
"type": "scene",
},
)

@command(click.argument("property"))
def get_prop(self, property):
"""Get the value of a property for given sid."""
Expand Down Expand Up @@ -738,6 +787,21 @@ def get_firmware_version(self) -> Optional[int]:
)
return self._fw_ver

@command()
def uninstall_scripts(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this uninstalls all scripts right?
I would then rename this to "Unsubscribe_all_events"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actual installation of program into gateway memory.... installation and deletion...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what you can do with loading such a "program" into gateway memory?
Can you only get callbacks when an event happens/property changes or can it do more?

It would be really nice if you could include docstrings """explanation"""" for all new functions that briefly explain what the function can do and how it works.

Copy link
Contributor Author

@bskaplou bskaplou May 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain what you can do with loading such a "program" into gateway memory?
Gateway is able to store limited amount of such "scripts". Scripts are permanent upon restarts. External device call parameters are also encoded with script (i.e. rotate)...
For each devices receiving messages from scripts gateway maintains state... It means gateway is sending ping requests and will not send actual packages in pings are faulty..

Can you only get callbacks when an event happens/property changes or can it do more?
Unfortunately we have no callbacks... Script allows to send package to other miio device as a result of action....

docs are added for all methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subscription mechanism can be implement by scripts if you use target with ip is an ip of python-miio running device... But in xiaomi scripts are not expected to be used as callbacks. It was designed to make gateway implement complete automation listen=>process=>send data to other devices... Such behavior is still possbile. You can create script which turns on wi-fi power plug in cube rotation without processing of callback in hass.

return dict(
map(
lambda action: (
action,
(
action_id[action](self.sid),
self._gw.x_del(action_id[action](self.sid)),
),
),
action_id.keys(),
)
)


class AqaraHT(SubDevice):
"""Subdevice AqaraHT specific properties and methods"""
Expand Down Expand Up @@ -921,3 +985,55 @@ def update(self):
self._props.status_ch0 = values[0]
self._props.status_ch1 = values[1]
self._props.load_power = values[2]


class Cube(SubDevice):
"""Subdevice Cube specific properties and methods"""

properties = []

@command()
def install_move_script(self):
return self._gw.install_script(self.sid, "move", build_move)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename all of these to "subscribe_to_move" etc.
And make an aditional function "subscribe_to_all_events" that calls all these functions in one go.

maybe consider removing all these individual functions and just make one list "events" like the "properties" that contains all events that can be subscribed to ["move", "rotate", "shakeair", "flip90", "taptap", "flip180"] and then just make one function "subscribe_to_all_events" that subscibes to all events in a for loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed most of code duplication... Further reducing requires deep work with click....


@command()
def install_rotate_script(self):
return self._gw.install_script(self.sid, "rotate", build_rotate)

@command()
def install_shake_script(self):
return self._gw.install_script(self.sid, "shakeair", build_shakeair)

@command()
def install_flip90_script(self):
return self._gw.install_script(self.sid, "flip90", build_flip90)

@command()
def install_taptap_script(self):
return self._gw.install_script(self.sid, "taptap", build_taptap)

@command()
def install_flip180_script(self):
return self._gw.install_script(self.sid, "flip180", build_flip180)


class AqaraSquareButton(SubDevice):
"""Subdevice AqaraSquareButton specific properties and methods"""

properties = []

@command()
def install_singlepress_script(self):
return self._gw.install_script(self.sid, "singlepress", build_singlepress)

@command()
def install_doublepress_script(self):
return self._gw.install_script(self.sid, "doublepress", build_doublepress)

@command()
def install_longpress_script(self):
return self._gw.install_script(self.sid, "longpress", build_longpress)

@command()
def install_shake_script(self):
return self._gw.install_script(self.sid, "shake", build_shake)
Loading