Skip to content

Commit

Permalink
Add APRS device tracker component
Browse files Browse the repository at this point in the history
This component keeps open a connection to the APRS-IS infrastructure so
messages generated by filtered callsigns can be immediately acted upon.
Any messages with certain values for the 'format' key are position
reports and are parsed into device tracker entities.
  • Loading branch information
PhilRW committed Apr 15, 2019
1 parent 2f89f88 commit 8c0e561
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 0 deletions.
1 change: 1 addition & 0 deletions homeassistant/components/aprs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""The APRS component."""
166 changes: 166 additions & 0 deletions homeassistant/components/aprs/device_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""Support for APRS device tracking."""

import logging
import threading

import voluptuous as vol

from homeassistant.components.device_tracker import (
ATTR_GPS_ACCURACY, ATTR_LATITUDE, ATTR_LONGITUDE, PLATFORM_SCHEMA)
from homeassistant.const import (
CONF_HOST, CONF_PASSWORD, CONF_USERNAME, EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP)
from homeassistant.exceptions import PlatformNotReady
import homeassistant.helpers.config_validation as cv
from homeassistant.util import slugify

DOMAIN = 'aprs'

_LOGGER = logging.getLogger(__name__)

ATTR_ALTITUDE = 'altitude'
ATTR_COURSE = 'course'
ATTR_COMMENT = 'comment'
ATTR_FROM = 'from'
ATTR_FORMAT = 'format'
ATTR_POS_AMBIGUITY = 'posambiguity'
ATTR_SPEED = 'speed'

CONF_CALLSIGNS = 'callsigns'

DEFAULT_HOST = 'rotate.aprs2.net'
DEFAULT_PASSWORD = '-1'

FILTER_PORT = 14580

MSG_FORMATS = ['compressed', 'uncompressed', 'mic-e']

PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_CALLSIGNS): cv.ensure_list,
vol.Required(CONF_USERNAME): cv.string,
vol.Optional(CONF_PASSWORD,
default=DEFAULT_PASSWORD): cv.string,
vol.Optional(CONF_HOST,
default=DEFAULT_HOST): cv.string
})


def make_filter(callsigns: list) -> str:
"""Make a server-side filter from a list of callsigns."""
return ' '.join('b/{0}'.format(cs.upper()) for cs in callsigns)


def gps_accuracy(gps, posambiguity: int) -> int:
"""Calculate the GPS accuracy based on APRS posambiguity."""
import geopy.distance

pos_a_map = {0: 0,
1: 1 / 600,
2: 1 / 60,
3: 1 / 6,
4: 1}
if posambiguity in pos_a_map:
degrees = pos_a_map[posambiguity]

gps2 = (gps[0], gps[1] + degrees)
dist_m = geopy.distance.distance(gps, gps2).m

accuracy = round(dist_m)
else:
message = "APRS position ambiguity must be 0-4, not '{0}'.".format(
posambiguity)
raise ValueError(message)

return accuracy


def setup_scanner(hass, config, see, discovery_info=None):
"""Set up the APRS tracker."""
callsigns = config.get(CONF_CALLSIGNS)
server_filter = make_filter(callsigns)

callsign = config.get(CONF_USERNAME)
password = config.get(CONF_PASSWORD)
host = config.get(CONF_HOST)
aprs_listener = AprsListenerThread(
callsign, password, host, server_filter, see)

def aprs_disconnect(event):
"""Stop the APRS connection."""
aprs_listener.stop()

def aprs_connect(event):
"""Start the APRS connection."""
aprs_listener.start()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP,
aprs_disconnect)

hass.bus.listen_once(EVENT_HOMEASSISTANT_START,
aprs_connect)

return True


class AprsListenerThread(threading.Thread):
"""APRS message listener."""

def __init__(self, callsign, password, host, server_filter, see):
"""Initialize the class."""
super().__init__()

import aprslib

self.callsign = callsign
self.host = host
self.server_filter = server_filter
self.see = see
self.ais = aprslib.IS(
self.callsign, passwd=password, host=self.host, port=FILTER_PORT)

def run(self):
"""Connect to APRS and listen for data."""
self.ais.set_filter(self.server_filter)
from aprslib import ConnectionError as AprsConnectionError
from aprslib import LoginError

try:
_LOGGER.info("Opening connection to %s with callsign %s",
self.host, self.callsign)
self.ais.connect()
self.ais.consumer(callback=self.rx_msg, immortal=True)
except (AprsConnectionError, LoginError) as err:
raise PlatformNotReady(err)
except OSError:
_LOGGER.info("Closing connection to %s with callsign %s",
self.host, self.callsign)

def stop(self):
"""Close the connection to the APRS network."""
self.ais.close()

def rx_msg(self, msg):
"""Receive message and process if position."""
_LOGGER.debug("APRS message received: %s", str(msg))
if msg[ATTR_FORMAT] in MSG_FORMATS:
dev_id = slugify(msg[ATTR_FROM])
lat = msg[ATTR_LATITUDE]
lon = msg[ATTR_LONGITUDE]

attrs = {}
if ATTR_POS_AMBIGUITY in msg:
pos_amb = msg[ATTR_POS_AMBIGUITY]
try:
attrs[ATTR_GPS_ACCURACY] = gps_accuracy((lat, lon),
pos_amb)
except ValueError:
_LOGGER.warning(
"APRS message contained invalid posambiguity: %s",
str(pos_amb))
for attr in [ATTR_ALTITUDE,
ATTR_COMMENT,
ATTR_COURSE,
ATTR_SPEED]:
if attr in msg:
attrs[attr] = msg[attr]

self.see(dev_id=dev_id, gps=(lat, lon), attributes=attrs)
11 changes: 11 additions & 0 deletions homeassistant/components/aprs/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"domain": "aprs",
"name": "APRS",
"documentation": "https://www.home-assistant.io/components/aprs",
"dependencies": [],
"codeowners": ["@PhilRW"],
"requirements": [
"aprslib==0.6.46",
"geopy==1.19.0"
]
}
6 changes: 6 additions & 0 deletions requirements_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ apcaccess==0.0.13
# homeassistant.components.apns
apns2==0.3.0

# homeassistant.components.aprs
aprslib==0.6.46

# homeassistant.components.aqualogic
aqualogic==1.0

Expand Down Expand Up @@ -463,6 +466,9 @@ geizhals==0.0.9
# homeassistant.components.usgs_earthquakes_feed
geojson_client==0.3

# homeassistant.components.aprs
geopy==1.19.0

# homeassistant.components.geo_rss_events
georss_generic_client==0.2

Expand Down
3 changes: 3 additions & 0 deletions requirements_test_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ gTTS-token==1.1.3
# homeassistant.components.usgs_earthquakes_feed
geojson_client==0.3

# homeassistant.components.aprs
geopy==1.19.0

# homeassistant.components.geo_rss_events
georss_generic_client==0.2

Expand Down
1 change: 1 addition & 0 deletions script/gen_requirements_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
'feedparser-homeassistant',
'foobot_async',
'geojson_client',
'geopy',
'georss_generic_client',
'georss_ign_sismologia_client',
'google-api-python-client',
Expand Down
1 change: 1 addition & 0 deletions tests/components/aprs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Tests for the APRS component."""
131 changes: 131 additions & 0 deletions tests/components/aprs/test_device_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""Test APRS device tracker."""
import sys
import unittest
from unittest.mock import Mock, patch

import homeassistant.components.aprs.device_tracker as device_tracker
from homeassistant.const import EVENT_HOMEASSISTANT_START

from tests.common import get_test_home_assistant

DEFAULT_PORT = 14580

TEST_CALLSIGN = 'testcall'
TEST_COORDS_NULL_ISLAND = (0, 0)
TEST_FILTER = 'testfilter'
TEST_HOST = 'testhost'
TEST_PASSWORD = 'testpass'


class TestAprsDeviceTracker(unittest.TestCase):
"""Test Case for APRS device tracker."""

def test_make_filter(self):
"""Test filter."""
callsigns = [
'CALLSIGN1',
'callsign2'
]
res = device_tracker.make_filter(callsigns)
self.assertEqual(res, "b/CALLSIGN1 b/CALLSIGN2")

def test_gps_accuracy_0(self):
"""Test GPS accuracy level 0."""
acc = device_tracker.gps_accuracy(TEST_COORDS_NULL_ISLAND, 0)
self.assertEqual(acc, 0)

def test_gps_accuracy_1(self):
"""Test GPS accuracy level 1."""
acc = device_tracker.gps_accuracy(TEST_COORDS_NULL_ISLAND, 1)
self.assertEqual(acc, 186)

def test_gps_accuracy_2(self):
"""Test GPS accuracy level 2."""
acc = device_tracker.gps_accuracy(TEST_COORDS_NULL_ISLAND, 2)
self.assertEqual(acc, 1855)

def test_gps_accuracy_3(self):
"""Test GPS accuracy level 3."""
acc = device_tracker.gps_accuracy(TEST_COORDS_NULL_ISLAND, 3)
self.assertEqual(acc, 18553)

def test_gps_accuracy_4(self):
"""Test GPS accuracy level 4."""
acc = device_tracker.gps_accuracy(TEST_COORDS_NULL_ISLAND, 4)
self.assertEqual(acc, 111319)

def test_gps_accuracy_invalid_int(self):
"""Test GPS accuracy with invalid input."""
level = 5
try:
device_tracker.gps_accuracy(TEST_COORDS_NULL_ISLAND, level)
self.fail("No exception.")
except ValueError:
pass

def test_gps_accuracy_invalid_string(self):
"""Test GPS accuracy with invalid input."""
level = "not an int"
try:
device_tracker.gps_accuracy(TEST_COORDS_NULL_ISLAND, level)
self.fail("No exception.")
except ValueError:
pass

def test_gps_accuracy_invalid_float(self):
"""Test GPS accuracy with invalid input."""
level = 1.2
try:
device_tracker.gps_accuracy(TEST_COORDS_NULL_ISLAND, level)
self.fail("No exception.")
except ValueError:
pass

def test_aprs_listener_thread(self):
"""Test listener thread."""
sys.modules['aprslib'] = Mock()

with patch('aprslib.IS') as mock_ais:
callsign = TEST_CALLSIGN
password = TEST_PASSWORD
host = TEST_HOST
server_filter = TEST_FILTER
port = DEFAULT_PORT
see = Mock()

listener = device_tracker.AprsListenerThread(
callsign, password, host, server_filter, see)
listener.run()

self.assertEqual(listener.callsign, callsign)
self.assertEqual(listener.host, host)
self.assertEqual(listener.server_filter, server_filter)
self.assertEqual(listener.see, see)
mock_ais.assert_called_with(
callsign, passwd=password, host=host, port=port)

def test_setup_scanner(self):
"""Test setup_scanner."""
with patch('homeassistant.components.'
'aprs.device_tracker.AprsListenerThread') as listener:
hass = get_test_home_assistant()
hass.start()

config = {
'username': TEST_CALLSIGN,
'password': TEST_PASSWORD,
'host': TEST_HOST,
'callsigns': [
'XX0FOO*',
'YY0BAR-1']
}

see = Mock()
res = device_tracker.setup_scanner(hass, config, see)
hass.bus.fire(EVENT_HOMEASSISTANT_START)
hass.stop()

self.assertTrue(res)
listener.assert_called_with(
TEST_CALLSIGN, TEST_PASSWORD, TEST_HOST,
'b/XX0FOO* b/YY0BAR-1', see)

0 comments on commit 8c0e561

Please sign in to comment.