Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InfluxDB send retry after IOError #10263

Merged
merged 17 commits into from
Nov 24, 2017
Merged
91 changes: 91 additions & 0 deletions homeassistant/components/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/influxdb/
"""
from datetime import timedelta
from functools import wraps, partial
import logging
import re

Expand All @@ -16,6 +18,7 @@
CONF_EXCLUDE, CONF_INCLUDE, CONF_DOMAINS, CONF_ENTITIES)
from homeassistant.helpers import state as state_helper
from homeassistant.helpers.entity_values import EntityValues
from homeassistant.util import utcnow
import homeassistant.helpers.config_validation as cv

REQUIREMENTS = ['influxdb==4.1.1']
Expand All @@ -30,6 +33,8 @@
CONF_COMPONENT_CONFIG = 'component_config'
CONF_COMPONENT_CONFIG_GLOB = 'component_config_glob'
CONF_COMPONENT_CONFIG_DOMAIN = 'component_config_domain'
CONF_RETRY_COUNT = 'max_retries'
CONF_RETRY_QUEUE = 'retry_queue_limit'

DEFAULT_DATABASE = 'home_assistant'
DEFAULT_VERIFY_SSL = True
Expand Down Expand Up @@ -58,6 +63,8 @@
vol.Optional(CONF_DB_NAME, default=DEFAULT_DATABASE): cv.string,
vol.Optional(CONF_PORT): cv.port,
vol.Optional(CONF_SSL): cv.boolean,
vol.Optional(CONF_RETRY_COUNT, default=0): cv.positive_int,
vol.Optional(CONF_RETRY_QUEUE, default=20): cv.positive_int,
vol.Optional(CONF_DEFAULT_MEASUREMENT): cv.string,
vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string,
vol.Optional(CONF_TAGS, default={}):
Expand Down Expand Up @@ -119,6 +126,8 @@ def setup(hass, config):
conf[CONF_COMPONENT_CONFIG],
conf[CONF_COMPONENT_CONFIG_DOMAIN],
conf[CONF_COMPONENT_CONFIG_GLOB])
max_tries = conf.get(CONF_RETRY_COUNT)
queue_limit = conf.get(CONF_RETRY_QUEUE)

try:
influx = InfluxDBClient(**kwargs)
Expand Down Expand Up @@ -213,6 +222,11 @@ def influx_event_listener(event):

json_body[0]['tags'].update(tags)

_write_data(json_body)

@RetryOnError(hass, retry_limit=max_tries, retry_delay=20,
queue_limit=queue_limit)
def _write_data(json_body):
try:
influx.write_points(json_body)
except exceptions.InfluxDBClientError:
Expand All @@ -221,3 +235,80 @@ def influx_event_listener(event):
hass.bus.listen(EVENT_STATE_CHANGED, influx_event_listener)

return True


class RetryOnError(object):
"""A class for retrying a failed task a certain amount of tries.

This method decorator makes a method retrying on errors. If there was an
uncaught exception, it schedules another try to execute the task after a
retry delay. It does this up to the maximum number of retries.

It can be used for all probable "self-healing" problems like network
outages. The task will be rescheduled using HAs scheduling mechanism.

It takes a Hass instance, a maximum number of retries and a retry delay
in seconds as arguments.

The queue limit defines the maximum number of calls that are allowed to
be queued at a time. If this number is reached, every new call discards
an old one.
"""

def __init__(self, hass, retry_limit=0, retry_delay=20, queue_limit=100):
"""Initialize the decorator."""
self.hass = hass
self.retry_limit = retry_limit
self.retry_delay = timedelta(seconds=retry_delay)
self.queue_limit = queue_limit

def __call__(self, method):
"""Decorate the target method."""
from homeassistant.helpers.event import track_point_in_utc_time

@wraps(method)
def wrapper(*args, **kwargs):
"""Wrapped method."""
# pylint: disable=protected-access
if not hasattr(wrapper, "_retry_queue"):
wrapper._retry_queue = []

def scheduled(retry=0, untrack=None, event=None):
"""Call the target method.

It is called directly at the first time and then called
scheduled within the Hass mainloop.
"""
if untrack is not None:
wrapper._retry_queue.remove(untrack)

# pylint: disable=broad-except
try:
method(*args, **kwargs)
except Exception as ex:
if retry == self.retry_limit:
raise
if len(wrapper._retry_queue) >= self.queue_limit:
last = wrapper._retry_queue.pop(0)
if 'remove' in last:
func = last['remove']
func()
if 'exc' in last:
_LOGGER.error(
"Retry queue overflow, drop oldest entry: %s",
str(last['exc']))

target = utcnow() + self.retry_delay
tracking = {'target': target}
remove = track_point_in_utc_time(self.hass,
partial(scheduled,
retry + 1,
tracking),
target)
tracking['remove'] = remove
tracking["exc"] = ex
wrapper._retry_queue.append(tracking)

scheduled()

return wrapper

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no newline at end of file

1 change: 1 addition & 0 deletions homeassistant/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,4 @@ def wrapper(*args, **kwargs):
throttle[0].release()

return wrapper

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blank line at end of file

170 changes: 169 additions & 1 deletion tests/components/test_influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@
import datetime
from unittest import mock

from datetime import timedelta
from unittest.mock import MagicMock

import influxdb as influx_client

from homeassistant.util import dt as dt_util
from homeassistant import core as ha
from homeassistant.setup import setup_component
import homeassistant.components.influxdb as influxdb
from homeassistant.const import EVENT_STATE_CHANGED, STATE_OFF, STATE_ON, \
Expand Down Expand Up @@ -36,6 +41,7 @@ def test_setup_config_full(self, mock_client):
'database': 'db',
'username': 'user',
'password': 'password',
'max_retries': 4,
'ssl': 'False',
'verify_ssl': 'False',
}
Expand Down Expand Up @@ -91,7 +97,7 @@ def test_setup_query_fail(self, mock_client):
influx_client.exceptions.InfluxDBClientError('fake')
assert not setup_component(self.hass, influxdb.DOMAIN, config)

def _setup(self):
def _setup(self, **kwargs):
"""Setup the client."""
config = {
'influxdb': {
Expand All @@ -104,6 +110,7 @@ def _setup(self):
}
}
}
config['influxdb'].update(kwargs)
assert setup_component(self.hass, influxdb.DOMAIN, config)
self.handler_method = self.hass.bus.listen.call_args_list[0][0][1]

Expand Down Expand Up @@ -649,3 +656,164 @@ def test_event_listener_component_override_measurement(self, mock_client):
mock.call(body)
)
mock_client.return_value.write_points.reset_mock()

def test_scheduled_write(self, mock_client):
"""Test the event listener to retry after write failures."""
self._setup(max_retries=1)

state = mock.MagicMock(
state=1, domain='fake', entity_id='entity.id', object_id='entity',
attributes={})
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
mock_client.return_value.write_points.side_effect = \
IOError('foo')

start = dt_util.utcnow()

self.handler_method(event)
json_data = mock_client.return_value.write_points.call_args[0][0]
self.assertEqual(mock_client.return_value.write_points.call_count, 1)

shifted_time = start + (timedelta(seconds=20 + 1))
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: shifted_time})
self.hass.block_till_done()
self.assertEqual(mock_client.return_value.write_points.call_count, 2)
mock_client.return_value.write_points.assert_called_with(json_data)

shifted_time = shifted_time + (timedelta(seconds=20 + 1))
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: shifted_time})
self.hass.block_till_done()
self.assertEqual(mock_client.return_value.write_points.call_count, 2)


class TestRetryOnErrorDecorator(unittest.TestCase):
"""Test the RetryOnError decorator."""

def setUp(self):
"""Setup things to be run when tests are started."""
self.hass = get_test_home_assistant()

def tearDown(self):
"""Clear data."""
self.hass.stop()

def test_no_retry(self):
"""Test that it does not retry if configured."""
mock_method = MagicMock()
wrapped = influxdb.RetryOnError(self.hass)(mock_method)
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 1)
mock_method.assert_called_with(1, 2, test=3)

mock_method.side_effect = Exception()
self.assertRaises(Exception, wrapped, 1, 2, test=3)
self.assertEqual(mock_method.call_count, 2)
mock_method.assert_called_with(1, 2, test=3)

def test_single_retry(self):
"""Test that retry stops after a single try if configured."""
mock_method = MagicMock()
retryer = influxdb.RetryOnError(self.hass, retry_limit=1)
wrapped = retryer(mock_method)
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 1)
mock_method.assert_called_with(1, 2, test=3)

start = dt_util.utcnow()
shifted_time = start + (timedelta(seconds=20 + 1))
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: shifted_time})
self.hass.block_till_done()
self.assertEqual(mock_method.call_count, 1)

mock_method.side_effect = Exception()
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 2)
mock_method.assert_called_with(1, 2, test=3)

for cnt in range(3):
start = dt_util.utcnow()
shifted_time = start + (timedelta(seconds=20 + 1))
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: shifted_time})
self.hass.block_till_done()
self.assertEqual(mock_method.call_count, 3)
mock_method.assert_called_with(1, 2, test=3)

def test_multi_retry(self):
"""Test that multiple retries work."""
mock_method = MagicMock()
retryer = influxdb.RetryOnError(self.hass, retry_limit=4)
wrapped = retryer(mock_method)
mock_method.side_effect = Exception()

wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 1)
mock_method.assert_called_with(1, 2, test=3)

for cnt in range(3):
start = dt_util.utcnow()
shifted_time = start + (timedelta(seconds=20 + 1))
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: shifted_time})
self.hass.block_till_done()
self.assertEqual(mock_method.call_count, cnt + 2)
mock_method.assert_called_with(1, 2, test=3)

def test_max_queue(self):
"""Test the maximum queue length."""
# make a wrapped method
mock_method = MagicMock()
retryer = influxdb.RetryOnError(
self.hass, retry_limit=4, queue_limit=3)
wrapped = retryer(mock_method)
mock_method.side_effect = Exception()

# call it once, call fails, queue fills to 1
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 1)
mock_method.assert_called_with(1, 2, test=3)
self.assertEqual(len(wrapped._retry_queue), 1)

# two more calls that failed. queue is 3
wrapped(1, 2, test=3)
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 3)
self.assertEqual(len(wrapped._retry_queue), 3)

# another call, queue gets limited to 3
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 4)
self.assertEqual(len(wrapped._retry_queue), 3)

# time passes
start = dt_util.utcnow()
shifted_time = start + (timedelta(seconds=20 + 1))
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: shifted_time})
self.hass.block_till_done()

# only the three queued calls where repeated
self.assertEqual(mock_method.call_count, 7)
self.assertEqual(len(wrapped._retry_queue), 3)

# another call, queue stays limited
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 8)
self.assertEqual(len(wrapped._retry_queue), 3)

# disable the side effect
mock_method.side_effect = None

# time passes, all calls should succeed
start = dt_util.utcnow()
shifted_time = start + (timedelta(seconds=20 + 1))
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: shifted_time})
self.hass.block_till_done()

# three queued calls succeeded, queue empty.
self.assertEqual(mock_method.call_count, 11)
self.assertEqual(len(wrapped._retry_queue), 0)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no newline at end of file

1 change: 1 addition & 0 deletions tests/util/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,4 @@ def mock_choice(choices):
mock_random.SystemRandom.return_value = generator

assert util.get_random_string(length=3) == 'ABC'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blank line at end of file