Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zigbee suqare button and cube support via fake_device #709

Closed
wants to merge 12 commits into from
123 changes: 123 additions & 0 deletions miio/fake_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import calendar
import datetime
import logging
import socket
import struct
from functools import reduce

import sys
import ifaddr
from miio.protocol import Message

_LOGGER = logging.getLogger(__name__)


def ipv4_nonloop_ips():
def flatten(a, b):
a.extend(b)
return a

return list(
filter(
lambda ip: isinstance(ip, str) and not ip.startswith("127"),
map(
lambda ip: ip.ip,
reduce(
flatten, map(lambda adapter: adapter.ips, ifaddr.get_adapters()), []
),
),
)
)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads really, really complicated.. What for this is and is it necessary? I suppose for binding on multiple interfaces? If yes, that could also be potentially useful for discovery (iirc there is an issue about not being able to discover devices from other subnets). Is there maybe a library that could be used to achieve this?



class FakeDevice:
_device_id = None
_address = None
_token = None
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to define these as class attributes.


def __init__(self, device_id, token, address="0.0.0.0"):
self._device_id = device_id
self._token = token
self._address = address

def run(self, callback):
def build_ack(device: int):
# Original devices are using year 1970, but it seems current datetime is fine
timestamp = calendar.timegm(datetime.datetime.now().timetuple())
# ACK packet not signed, 16 bytes header + 16 bytes of zeroes
return struct.pack(">HHIII16s", 0x2131, 32, 0, device, timestamp, bytes(16))

helobytes = bytes.fromhex(
"21310020ffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
)

sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
# Gateway interacts only with port 54321
sock.bind((self._address, 54321))
_LOGGER.info(
"fake miio device started with address=%s device_id=%s callback=%s token=****",
self._address,
self._device_id,
callback,
)

while True:
data, [host, port] = sock.recvfrom(1024)
if data == helobytes:
_LOGGER.debug("%s:%s=>PING", host, port)
m = build_ack(self._device_id)
sock.sendto(m, (host, port))
_LOGGER.debug("%s:%s<=ACK(device_id=%s)", host, port, self._device_id)
else:
request = Message.parse(data, token=self._token)
value = request.data.value
_LOGGER.debug("%s:%s=>%s", host, port, value)
action, device_call_id = value["method"].split("_")
source_device_id = (
f"lumi.{device_call_id}" # All known devices use lumi. prefix
)
callback(source_device_id, action, value["params"])
# This result means OK, but some methods return ['ok'] instead of 0
# might be necessary to use different results for different methods
result = {"result": 0, "id": value["id"]}
header = {
"length": 0,
"unknown": 0,
"device_id": self._device_id,
"ts": datetime.datetime.now(),
}
msg = {
"data": {"value": result},
"header": {"value": header},
"checksum": 0,
}
response = Message.build(msg, token=self._token)
_LOGGER.debug("%s:%s<=%s", host, port, result)
sock.sendto(response, (host, port))


if __name__ == "__main__":

def callback(source_device, action, params):
_LOGGER.debug(f"CALLBACK {source_device}=>{action}({params})")

from gateway_scripts import fake_device_id

logging.basicConfig(level="DEBUG")

device_id = int(fake_device_id)
# Use real token on fake device for encryption
# encoded is used in scripts = key pair should match
# tokens = {
# "real": "9bc7c7ce6291d3e443fd7708608b9892",
# "encoded": "79cf21b08fb051499389f23c113477a4",
# }
if len(sys.argv) > 1:
device_token = bytes.fromhex(sys.argv[1])
else:
print(
"WARNING kae device starting with publically known token! Pass other token next time pls..."
)
device_token = bytes.fromhex("9bc7c7ce6291d3e443fd7708608b9892")
fake_device = FakeDevice(device_id, device_token)
fake_device.run(callback)
134 changes: 134 additions & 0 deletions miio/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,28 @@
from datetime import datetime
from enum import Enum, IntEnum
from typing import Optional
from random import randrange

import attr
import click

from .click_common import EnumType, command, format_output
from .device import Device
from .exceptions import DeviceException
from .fake_device import ipv4_nonloop_ips
from .gateway_scripts import (
action_id,
build_doublepress,
build_flip90,
build_flip180,
build_longpress,
build_move,
build_rotate,
build_shake,
build_shakeair,
build_singlepress,
build_taptap,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is waaaaay too specific to be inside the gateway class. We need a way to expose subdevice based actions some other way.

)
from .utils import brightness_and_color_to_int, int_to_brightness, int_to_rgb

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -166,6 +181,8 @@ def discover_devices(self):
DeviceType.Plug: AqaraPlug,
DeviceType.SensorHT: SensorHT,
DeviceType.AqaraHT: AqaraHT,
DeviceType.Cube: Cube,
DeviceType.AqaraSquareButton: AqaraSquareButton,
DeviceType.AqaraMagnet: AqaraMagnet,
DeviceType.AqaraSwitchOneChannel: AqaraSwitchOneChannel,
DeviceType.AqaraSwitchTwoChannels: AqaraSwitchTwoChannels,
Expand Down Expand Up @@ -207,6 +224,47 @@ def discover_devices(self):

return self._devices

def x_del(self, script_id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename "x_del" to "unsubscribe_event"
and "script_id" to "subscription_id"

Copy link
Contributor Author

@bskaplou bskaplou May 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

x_del is name of method... python method name mimics MIIO method... Less conversions => more comprehension... x_del is not executed directly, it's low-level method which is called from uninstall ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright I understand, you are probably right if this function is only called by other functions

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this should be named otherwise, especially if this can be useful for other use cases. Otherwise it is probably simpler to simply use self.send where necessary (or at least rename this to something more descriptive, and prefix it with _ to indicate that it's a private method not to be used by others.

"""Delete script by id."""
return self.send("miIO.xdel", [script_id])

@command(
click.argument("sid"),
click.argument("command"),
click.argument("encoded_token"),
)
def subdevice_command(self, sid, command, encoded_token):
"""Send command to subdevice."""
self.discover_devices()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to do the discovery for this? If yes, I think it would be better to modify the discover_devices to return a dictionary of {sid: SubDevice}. This would allow dropping the next line and make it much more easier to comprehend.

target = list(filter(lambda subdevice: subdevice.sid == sid, self.devices))
if len(target) < 1:
return f"Device with sid={sid} not found"
elif not hasattr(target[0], command):
return f"Device with sid={sid} has no method {command}"
else:
return getattr(target[0], command)(encoded_token)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usage of hasattr and getattr usually indicate a code smell, i.e., something that could be done in a nicer way. Maybe the subdevice should have a decorator to mark exposed methods (not unlike how command is used) that could then be returned when accessing commands() property of the subdevice?


def install_script(self, sid, builder, encoded_token, ip=None):
"""Install script for by building script source and sending it with miio method. You need to run fake or real device to capture script execution results."""
if ip is None:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see this being much simpler, in my opinion the API should be more something like:

gw = Gateway()
cube = gw.get_device_by_id("cube id")
some_other_device = gw.get_device_by_id("my light")
cube.install_event_handler(cube, 'shake', some_other_device, 'some action')

Likewise, if I have a cube instance, I would like to query the list of available events and already installed scripts:

print("available events: %s" % cube.events)  # I mention this way of exposing the available events in some other comment

# if there is only one script possible, otherwise we need a way to get a list of already installed scripts to make it possible for the user to decide which one to remove
cube.remove_script("shake")  

# the other case
handlers = cube.installed_event_handlers
for handler in handers:
    print("handler %s")

addresses = ipv4_nonloop_ips()
my_ip = addresses[0] # Taking first public IP
else:
my_ip = ip

data_tkn = randrange(5000, 10000)
source = builder(sid, my_ip, encoded_token)
return self.send(
"send_data_frame",
{
"cur": 0,
"data": source,
"data_tkn": data_tkn,
"total": 1,
"type": "scene",
},
)

@command(click.argument("property"))
def get_prop(self, property):
"""Get the value of a property for given sid."""
Expand Down Expand Up @@ -744,6 +802,21 @@ def get_firmware_version(self) -> Optional[int]:
)
return self._fw_ver

@command()
def uninstall_scripts(self, encoded_token):
return dict(
map(
lambda action: (
action,
(
action_id[action](self.sid),
self._gw.x_del(action_id[action](self.sid)),
),
),
action_id.keys(),
)
)
Comment on lines +806 to +818
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is way coo complicated and rather hard to interpret.

Is the goal to remove all scripts? Is there a way to get a list of current events, that could be iterated and then removed? How does removal/overwriting of a single script look like?

How is the encoded token used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes... goal is to remove all scripts at least potentially could be installed in device... As far as i get list of installed scripts is not exposed through miio protocol... Here I xdel all scripts for all possible scripts to avoid memory leak on gw

Encoded token is used during script installation and somehow linked with real token. Unfortunately algo is unknown...

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think there were some sort of list functionality related to scenes in the gateway implementation at some point, that could be related to this?

Anyway, I pointed out the encoded_token here as it is given as a parameter but never used in the method itself?



class AqaraHT(SubDevice):
"""Subdevice AqaraHT specific properties and methods"""
Expand Down Expand Up @@ -928,6 +1001,67 @@ def update(self):
self._props.status_ch1 = values[1]
self._props.load_power = values[2]

class Cube(SubDevice):
"""Subdevice Cube specific properties and methods"""

properties = []

@command()
def install_move_script(self, encoded_token):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these installs should probably be just a single method in SubDevice: install_event_handler or something like that, that would take 1) device, 2) event, 3) event receiver, and 4) method to be called.

The subdevices themselves should have a class attribute defining the available events. So in the case of the cube, the Cube class should define that it has events:

@property
def events():
    return ["tap_twice", "shake"]

The users of the lib can then request available events, and create a script for those.

"""Generate and install script which captures move event and sends miio package to device"""
return self._gw.install_script(self.sid, build_move, encoded_token)

@command()
def install_rotate_script(self, encoded_token):
"""Generate and install script which captures rotate event and sends miio package to device"""
return self._gw.install_script(self.sid, build_rotate, encoded_token)

@command()
def install_shake_script(self, encoded_token):
"""Generate and install script which captures shake in air event and sends miio package to device"""
return self._gw.install_script(self.sid, build_shakeair, encoded_token)

@command()
def install_flip90_script(self, encoded_token):
"""Generate and install script which captures horizontal 90 flip and sends miio package to device"""
return self._gw.install_script(self.sid, build_flip90, encoded_token)

@command()
def install_taptap_script(self, encoded_token):
"""Generate and install script which captures double tap on surface event and sends miio package to device"""
return self._gw.install_script(self.sid, build_taptap, encoded_token)

@command()
def install_flip180_script(self, encoded_token):
"""Generate and install script which captures horizontal 180 flip and sends miio package to device"""
return self._gw.install_script(self.sid, build_flip180, encoded_token)


class AqaraSquareButton(SubDevice):
"""Subdevice AqaraSquareButton specific properties and methods"""

properties = []

@command()
def install_singlepress_script(self, encoded_token):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment about how to expose events to users.

"""Generate and install script which captures single press event and sends miio package to device"""
return self._gw.install_script(self.sid, build_singlepress, encoded_token)

@command()
def install_doublepress_script(self, encoded_token):
"""Generate and install script which captures double press event and sends miio package to device"""
return self._gw.install_script(self.sid, build_doublepress, encoded_token)

@command()
def install_longpress_script(self, encoded_token):
"""Generate and install script which captures loooong press event and sends miio package to device"""
return self._gw.install_script(self.sid, build_longpress, encoded_token)

@command()
def install_shake_script(self, encoded_token):
"""Generate and install script which captures shake in air event and sends miio package to device"""
return self._gw.install_script(self.sid, build_shake, encoded_token)


class AqaraWallOutlet(SubDevice):
"""Subdevice AqaraWallOutlet specific properties and methods"""
Expand Down
Loading