-
-
Notifications
You must be signed in to change notification settings - Fork 32k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
InfluxDB send retry after IOError #10263
Changes from 14 commits
8e3ad10
a37a249
5ca3151
b05d452
514a3ac
570eaaa
b7088ac
5794d94
eeeaf64
3810cf4
1546ceb
18204e7
bdbf9d0
2cb43ed
9ab35ee
7fd1252
9ac1a53
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -312,3 +312,4 @@ def wrapper(*args, **kwargs): | |
throttle[0].release() | ||
|
||
return wrapper | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. blank line at end of file |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,8 +3,13 @@ | |
import datetime | ||
from unittest import mock | ||
|
||
from datetime import timedelta | ||
from unittest.mock import MagicMock | ||
|
||
import influxdb as influx_client | ||
|
||
from homeassistant.util import dt as dt_util | ||
from homeassistant import core as ha | ||
from homeassistant.setup import setup_component | ||
import homeassistant.components.influxdb as influxdb | ||
from homeassistant.const import EVENT_STATE_CHANGED, STATE_OFF, STATE_ON, \ | ||
|
@@ -36,6 +41,7 @@ def test_setup_config_full(self, mock_client): | |
'database': 'db', | ||
'username': 'user', | ||
'password': 'password', | ||
'max_retries': 4, | ||
'ssl': 'False', | ||
'verify_ssl': 'False', | ||
} | ||
|
@@ -91,7 +97,7 @@ def test_setup_query_fail(self, mock_client): | |
influx_client.exceptions.InfluxDBClientError('fake') | ||
assert not setup_component(self.hass, influxdb.DOMAIN, config) | ||
|
||
def _setup(self): | ||
def _setup(self, **kwargs): | ||
"""Setup the client.""" | ||
config = { | ||
'influxdb': { | ||
|
@@ -104,6 +110,7 @@ def _setup(self): | |
} | ||
} | ||
} | ||
config['influxdb'].update(kwargs) | ||
assert setup_component(self.hass, influxdb.DOMAIN, config) | ||
self.handler_method = self.hass.bus.listen.call_args_list[0][0][1] | ||
|
||
|
@@ -649,3 +656,164 @@ def test_event_listener_component_override_measurement(self, mock_client): | |
mock.call(body) | ||
) | ||
mock_client.return_value.write_points.reset_mock() | ||
|
||
def test_scheduled_write(self, mock_client): | ||
"""Test the event listener to retry after write failures.""" | ||
self._setup(max_retries=1) | ||
|
||
state = mock.MagicMock( | ||
state=1, domain='fake', entity_id='entity.id', object_id='entity', | ||
attributes={}) | ||
event = mock.MagicMock(data={'new_state': state}, time_fired=12345) | ||
mock_client.return_value.write_points.side_effect = \ | ||
IOError('foo') | ||
|
||
start = dt_util.utcnow() | ||
|
||
self.handler_method(event) | ||
json_data = mock_client.return_value.write_points.call_args[0][0] | ||
self.assertEqual(mock_client.return_value.write_points.call_count, 1) | ||
|
||
shifted_time = start + (timedelta(seconds=20 + 1)) | ||
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, | ||
{ha.ATTR_NOW: shifted_time}) | ||
self.hass.block_till_done() | ||
self.assertEqual(mock_client.return_value.write_points.call_count, 2) | ||
mock_client.return_value.write_points.assert_called_with(json_data) | ||
|
||
shifted_time = shifted_time + (timedelta(seconds=20 + 1)) | ||
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, | ||
{ha.ATTR_NOW: shifted_time}) | ||
self.hass.block_till_done() | ||
self.assertEqual(mock_client.return_value.write_points.call_count, 2) | ||
|
||
|
||
class TestRetryOnErrorDecorator(unittest.TestCase): | ||
"""Test the RetryOnError decorator.""" | ||
|
||
def setUp(self): | ||
"""Setup things to be run when tests are started.""" | ||
self.hass = get_test_home_assistant() | ||
|
||
def tearDown(self): | ||
"""Clear data.""" | ||
self.hass.stop() | ||
|
||
def test_no_retry(self): | ||
"""Test that it does not retry if configured.""" | ||
mock_method = MagicMock() | ||
wrapped = influxdb.RetryOnError(self.hass)(mock_method) | ||
wrapped(1, 2, test=3) | ||
self.assertEqual(mock_method.call_count, 1) | ||
mock_method.assert_called_with(1, 2, test=3) | ||
|
||
mock_method.side_effect = Exception() | ||
self.assertRaises(Exception, wrapped, 1, 2, test=3) | ||
self.assertEqual(mock_method.call_count, 2) | ||
mock_method.assert_called_with(1, 2, test=3) | ||
|
||
def test_single_retry(self): | ||
"""Test that retry stops after a single try if configured.""" | ||
mock_method = MagicMock() | ||
retryer = influxdb.RetryOnError(self.hass, retry_limit=1) | ||
wrapped = retryer(mock_method) | ||
wrapped(1, 2, test=3) | ||
self.assertEqual(mock_method.call_count, 1) | ||
mock_method.assert_called_with(1, 2, test=3) | ||
|
||
start = dt_util.utcnow() | ||
shifted_time = start + (timedelta(seconds=20 + 1)) | ||
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, | ||
{ha.ATTR_NOW: shifted_time}) | ||
self.hass.block_till_done() | ||
self.assertEqual(mock_method.call_count, 1) | ||
|
||
mock_method.side_effect = Exception() | ||
wrapped(1, 2, test=3) | ||
self.assertEqual(mock_method.call_count, 2) | ||
mock_method.assert_called_with(1, 2, test=3) | ||
|
||
for cnt in range(3): | ||
start = dt_util.utcnow() | ||
shifted_time = start + (timedelta(seconds=20 + 1)) | ||
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, | ||
{ha.ATTR_NOW: shifted_time}) | ||
self.hass.block_till_done() | ||
self.assertEqual(mock_method.call_count, 3) | ||
mock_method.assert_called_with(1, 2, test=3) | ||
|
||
def test_multi_retry(self): | ||
"""Test that multiple retries work.""" | ||
mock_method = MagicMock() | ||
retryer = influxdb.RetryOnError(self.hass, retry_limit=4) | ||
wrapped = retryer(mock_method) | ||
mock_method.side_effect = Exception() | ||
|
||
wrapped(1, 2, test=3) | ||
self.assertEqual(mock_method.call_count, 1) | ||
mock_method.assert_called_with(1, 2, test=3) | ||
|
||
for cnt in range(3): | ||
start = dt_util.utcnow() | ||
shifted_time = start + (timedelta(seconds=20 + 1)) | ||
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, | ||
{ha.ATTR_NOW: shifted_time}) | ||
self.hass.block_till_done() | ||
self.assertEqual(mock_method.call_count, cnt + 2) | ||
mock_method.assert_called_with(1, 2, test=3) | ||
|
||
def test_max_queue(self): | ||
"""Test the maximum queue length.""" | ||
# make a wrapped method | ||
mock_method = MagicMock() | ||
retryer = influxdb.RetryOnError( | ||
self.hass, retry_limit=4, queue_limit=3) | ||
wrapped = retryer(mock_method) | ||
mock_method.side_effect = Exception() | ||
|
||
# call it once, call fails, queue fills to 1 | ||
wrapped(1, 2, test=3) | ||
self.assertEqual(mock_method.call_count, 1) | ||
mock_method.assert_called_with(1, 2, test=3) | ||
self.assertEqual(len(wrapped._retry_queue), 1) | ||
|
||
# two more calls that failed. queue is 3 | ||
wrapped(1, 2, test=3) | ||
wrapped(1, 2, test=3) | ||
self.assertEqual(mock_method.call_count, 3) | ||
self.assertEqual(len(wrapped._retry_queue), 3) | ||
|
||
# another call, queue gets limited to 3 | ||
wrapped(1, 2, test=3) | ||
self.assertEqual(mock_method.call_count, 4) | ||
self.assertEqual(len(wrapped._retry_queue), 3) | ||
|
||
# time passes | ||
start = dt_util.utcnow() | ||
shifted_time = start + (timedelta(seconds=20 + 1)) | ||
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, | ||
{ha.ATTR_NOW: shifted_time}) | ||
self.hass.block_till_done() | ||
|
||
# only the three queued calls where repeated | ||
self.assertEqual(mock_method.call_count, 7) | ||
self.assertEqual(len(wrapped._retry_queue), 3) | ||
|
||
# another call, queue stays limited | ||
wrapped(1, 2, test=3) | ||
self.assertEqual(mock_method.call_count, 8) | ||
self.assertEqual(len(wrapped._retry_queue), 3) | ||
|
||
# disable the side effect | ||
mock_method.side_effect = None | ||
|
||
# time passes, all calls should succeed | ||
start = dt_util.utcnow() | ||
shifted_time = start + (timedelta(seconds=20 + 1)) | ||
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, | ||
{ha.ATTR_NOW: shifted_time}) | ||
self.hass.block_till_done() | ||
|
||
# three queued calls succeeded, queue empty. | ||
self.assertEqual(mock_method.call_count, 11) | ||
self.assertEqual(len(wrapped._retry_queue), 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no newline at end of file |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -280,3 +280,4 @@ def mock_choice(choices): | |
mock_random.SystemRandom.return_value = generator | ||
|
||
assert util.get_random_string(length=3) == 'ABC' | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. blank line at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no newline at end of file