-
-
Notifications
You must be signed in to change notification settings - Fork 32k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support for Unifi direct access device tracker (No unifi controller s…
…oftware)
- Loading branch information
William Scanlon
committed
Nov 17, 2017
1 parent
f43092c
commit 90fcca8
Showing
5 changed files
with
309 additions
and
0 deletions.
There are no files selected for viewing
134 changes: 134 additions & 0 deletions
134
homeassistant/components/device_tracker/unifi_direct.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
""" | ||
Support for Unifi AP direct access. | ||
For more details about this platform, please refer to the documentation at | ||
https://home-assistant.io/components/device_tracker.unifi_direct/ | ||
""" | ||
import logging | ||
import json | ||
|
||
import voluptuous as vol | ||
|
||
import homeassistant.helpers.config_validation as cv | ||
from homeassistant.components.device_tracker import ( | ||
DOMAIN, PLATFORM_SCHEMA, DeviceScanner) | ||
from homeassistant.const import ( | ||
CONF_HOST, CONF_PASSWORD, CONF_USERNAME, | ||
CONF_PORT) | ||
|
||
REQUIREMENTS = ['pexpect==4.0.1'] | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
DEFAULT_SSH_PORT = 22 | ||
UNIFI_COMMAND = 'mca-dump | tr -d "\n"' | ||
UNIFI_SSID_TABLE = "vap_table" | ||
UNIFI_CLIENT_TABLE = "sta_table" | ||
|
||
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ | ||
vol.Required(CONF_HOST): cv.string, | ||
vol.Required(CONF_PASSWORD): cv.string, | ||
vol.Required(CONF_USERNAME): cv.string, | ||
vol.Optional(CONF_PORT, default=DEFAULT_SSH_PORT): cv.port | ||
}) | ||
|
||
|
||
# pylint: disable=unused-argument | ||
def get_scanner(hass, config): | ||
"""Validate the configuration and return a Unifi direct scanner.""" | ||
scanner = UnifiDeviceScanner(config[DOMAIN]) | ||
if not scanner.connected: | ||
return False | ||
return scanner | ||
|
||
|
||
class UnifiDeviceScanner(DeviceScanner): | ||
"""This class queries Unifi wireless access point.""" | ||
|
||
def __init__(self, config): | ||
"""Initialize the scanner.""" | ||
self.host = config[CONF_HOST] | ||
self.username = config[CONF_USERNAME] | ||
self.password = config[CONF_PASSWORD] | ||
self.port = config[CONF_PORT] | ||
self.ssh = None | ||
self.connected = False | ||
self.last_results = {} | ||
self._connect() | ||
|
||
def scan_devices(self): | ||
"""Scan for new devices and return a list with found device IDs.""" | ||
result = _response_to_json(self._get_update()) | ||
if result: | ||
self.last_results = result | ||
return self.last_results.keys() | ||
|
||
def get_device_name(self, device): | ||
"""Return the name of the given device or None if we don't know.""" | ||
hostname = next(( | ||
value.get('hostname') for key, value in self.last_results.items() | ||
if key.upper() == device.upper()), None) | ||
if hostname is not None: | ||
hostname = str(hostname) | ||
return hostname | ||
|
||
def _connect(self): | ||
"""Connect to the Unifi AP SSH server.""" | ||
from pexpect import pxssh, exceptions | ||
|
||
self.ssh = pxssh.pxssh() | ||
try: | ||
self.ssh.login(self.host, self.username, | ||
password=self.password, port=self.port) | ||
self.connected = True | ||
except exceptions.EOF: | ||
_LOGGER.error("Connection refused. SSH enabled?") | ||
self._disconnect() | ||
|
||
def _disconnect(self): | ||
"""Disconnect the current SSH connection.""" | ||
# pylint: disable=broad-except | ||
try: | ||
self.ssh.logout() | ||
except Exception: | ||
pass | ||
finally: | ||
self.ssh = None | ||
|
||
self.connected = False | ||
|
||
def _get_update(self): | ||
from pexpect import pxssh | ||
|
||
try: | ||
if not self.connected: | ||
self._connect() | ||
self.ssh.sendline(UNIFI_COMMAND) | ||
self.ssh.prompt() | ||
return self.ssh.before | ||
except pxssh.ExceptionPxssh as err: | ||
_LOGGER.error("Unexpected SSH error: %s", str(err)) | ||
self._disconnect() | ||
return None | ||
except AssertionError as err: | ||
_LOGGER.error("Connection to AP unavailable: %s", str(err)) | ||
self._disconnect() | ||
return None | ||
|
||
|
||
def _response_to_json(response): | ||
try: | ||
json_response = json.loads(str(response)[31:-1].replace("\\", "")) | ||
_LOGGER.debug(str(json_response)) | ||
ssid_table = json_response.get(UNIFI_SSID_TABLE) | ||
active_clients = {} | ||
|
||
for ssid in ssid_table: | ||
client_table = ssid.get(UNIFI_CLIENT_TABLE) | ||
for client in client_table: | ||
active_clients[client.get("mac")] = client | ||
|
||
return active_clients | ||
except ValueError: | ||
_LOGGER.error("Failed to decode response from AP.") | ||
return {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
"""The tests for the Unifi direct device tracker platform.""" | ||
import os | ||
from datetime import timedelta | ||
import unittest | ||
from unittest import mock | ||
from unittest.mock import patch | ||
|
||
import pytest | ||
import voluptuous as vol | ||
|
||
from homeassistant.setup import setup_component | ||
from homeassistant.components import device_tracker | ||
from homeassistant.components.device_tracker import ( | ||
CONF_CONSIDER_HOME, CONF_TRACK_NEW) | ||
from homeassistant.components.device_tracker.unifi_direct import ( | ||
DOMAIN, CONF_PORT, PLATFORM_SCHEMA, _response_to_json, get_scanner) | ||
from homeassistant.const import (CONF_PLATFORM, CONF_PASSWORD, CONF_USERNAME, | ||
CONF_HOST) | ||
|
||
from tests.common import ( | ||
get_test_home_assistant, assert_setup_component, | ||
mock_component, load_fixture) | ||
|
||
|
||
class TestComponentsDeviceTrackerUnifiDirect(unittest.TestCase): | ||
"""Tests for the Unifi direct device tracker platform.""" | ||
|
||
hass = None | ||
scanner_path = 'homeassistant.components.device_tracker.' + \ | ||
'unifi_direct.UnifiDeviceScanner' | ||
|
||
def setup_method(self, _): | ||
"""Setup things to be run when tests are started.""" | ||
self.hass = get_test_home_assistant() | ||
mock_component(self.hass, 'zone') | ||
|
||
def teardown_method(self, _): | ||
"""Stop everything that was started.""" | ||
self.hass.stop() | ||
try: | ||
os.remove(self.hass.config.path(device_tracker.YAML_DEVICES)) | ||
except FileNotFoundError: | ||
pass | ||
|
||
@mock.patch(scanner_path, | ||
return_value=mock.MagicMock()) | ||
def test_get_scanner(self, unifi_mock): \ | ||
# pylint: disable=invalid-name | ||
"""Test creating an Unifi direct scanner with a password.""" | ||
conf_dict = { | ||
DOMAIN: { | ||
CONF_PLATFORM: 'unifi_direct', | ||
CONF_HOST: 'fake_host', | ||
CONF_USERNAME: 'fake_user', | ||
CONF_PASSWORD: 'fake_pass', | ||
CONF_TRACK_NEW: True, | ||
CONF_CONSIDER_HOME: timedelta(seconds=180) | ||
} | ||
} | ||
|
||
with assert_setup_component(1, DOMAIN): | ||
assert setup_component(self.hass, DOMAIN, conf_dict) | ||
|
||
conf_dict[DOMAIN][CONF_PORT] = 22 | ||
self.assertEqual(unifi_mock.call_args, mock.call(conf_dict[DOMAIN])) | ||
|
||
@patch('pexpect.pxssh.pxssh') | ||
def test_get_device_name(self, mock_ssh): | ||
""""Testing MAC matching.""" | ||
conf_dict = { | ||
DOMAIN: { | ||
CONF_PLATFORM: 'unifi_direct', | ||
CONF_HOST: 'fake_host', | ||
CONF_USERNAME: 'fake_user', | ||
CONF_PASSWORD: 'fake_pass', | ||
CONF_PORT: 22, | ||
CONF_TRACK_NEW: True, | ||
CONF_CONSIDER_HOME: timedelta(seconds=180) | ||
} | ||
} | ||
mock_ssh.return_value.before = load_fixture('unifi_direct.txt') | ||
scanner = get_scanner(self.hass, conf_dict) | ||
devices = scanner.scan_devices() | ||
self.assertEqual(23, len(devices)) | ||
self.assertEqual("iPhone", | ||
scanner.get_device_name("98:00:c6:56:34:12")) | ||
self.assertEqual("iPhone", | ||
scanner.get_device_name("98:00:C6:56:34:12")) | ||
|
||
@patch('pexpect.pxssh.pxssh.logout') | ||
@patch('pexpect.pxssh.pxssh.login') | ||
def test_failed_to_log_in(self, mock_login, mock_logout): | ||
""""Testing exception at login results in False.""" | ||
from pexpect import exceptions | ||
|
||
conf_dict = { | ||
DOMAIN: { | ||
CONF_PLATFORM: 'unifi_direct', | ||
CONF_HOST: 'fake_host', | ||
CONF_USERNAME: 'fake_user', | ||
CONF_PASSWORD: 'fake_pass', | ||
CONF_PORT: 22, | ||
CONF_TRACK_NEW: True, | ||
CONF_CONSIDER_HOME: timedelta(seconds=180) | ||
} | ||
} | ||
|
||
mock_login.side_effect = exceptions.EOF("Test") | ||
scanner = get_scanner(self.hass, conf_dict) | ||
self.assertFalse(scanner) | ||
|
||
@patch('pexpect.pxssh.pxssh.logout') | ||
@patch('pexpect.pxssh.pxssh.login', autospec=True) | ||
@patch('pexpect.pxssh.pxssh.prompt') | ||
@patch('pexpect.pxssh.pxssh.sendline') | ||
def test_to_get_update(self, mock_sendline, mock_prompt, mock_login, | ||
mock_logout): | ||
""""Testing exception in get_update matching.""" | ||
conf_dict = { | ||
DOMAIN: { | ||
CONF_PLATFORM: 'unifi_direct', | ||
CONF_HOST: 'fake_host', | ||
CONF_USERNAME: 'fake_user', | ||
CONF_PASSWORD: 'fake_pass', | ||
CONF_PORT: 22, | ||
CONF_TRACK_NEW: True, | ||
CONF_CONSIDER_HOME: timedelta(seconds=180) | ||
} | ||
} | ||
|
||
scanner = get_scanner(self.hass, conf_dict) | ||
# mock_sendline.side_effect = AssertionError("Test") | ||
mock_prompt.side_effect = AssertionError("Test") | ||
devices = scanner._get_update() # pylint: disable=protected-access | ||
self.assertTrue(devices is None) | ||
|
||
def test_good_reponse_parses(self): | ||
"""Test that the response form the AP parses to JSON correctly.""" | ||
response = _response_to_json(load_fixture('unifi_direct.txt')) | ||
self.assertTrue(response != {}) | ||
|
||
def test_bad_reponse_returns_none(self): | ||
"""Test that a bad response form the AP parses to JSON correctly.""" | ||
self.assertTrue(_response_to_json("{(}") == {}) | ||
|
||
|
||
def test_config_error(): | ||
"""Test for configuration errors.""" | ||
with pytest.raises(vol.Invalid): | ||
PLATFORM_SCHEMA({ | ||
# no username | ||
CONF_PASSWORD: 'password', | ||
CONF_PLATFORM: DOMAIN, | ||
CONF_HOST: 'myhost', | ||
'port': 123, | ||
}) | ||
with pytest.raises(vol.Invalid): | ||
PLATFORM_SCHEMA({ | ||
# no password | ||
CONF_USERNAME: 'foo', | ||
CONF_PLATFORM: DOMAIN, | ||
CONF_HOST: 'myhost', | ||
'port': 123, | ||
}) | ||
with pytest.raises(vol.Invalid): | ||
PLATFORM_SCHEMA({ | ||
CONF_PLATFORM: DOMAIN, | ||
CONF_USERNAME: 'foo', | ||
CONF_PASSWORD: 'password', | ||
CONF_HOST: 'myhost', | ||
'port': 'foo', # bad port! | ||
}) |
Large diffs are not rendered by default.
Oops, something went wrong.