diff --git a/homeassistant/components/influxdb.py b/homeassistant/components/influxdb.py index 55b0f08a711..d31d1e96431 100644 --- a/homeassistant/components/influxdb.py +++ b/homeassistant/components/influxdb.py @@ -4,6 +4,8 @@ A component which allows you to send data to an Influx database. For more details about this component, please refer to the documentation at https://home-assistant.io/components/influxdb/ """ +from datetime import timedelta +from functools import wraps, partial import logging import re @@ -16,6 +18,7 @@ from homeassistant.const import ( CONF_EXCLUDE, CONF_INCLUDE, CONF_DOMAINS, CONF_ENTITIES) from homeassistant.helpers import state as state_helper from homeassistant.helpers.entity_values import EntityValues +from homeassistant.util import utcnow import homeassistant.helpers.config_validation as cv REQUIREMENTS = ['influxdb==4.1.1'] @@ -30,6 +33,8 @@ CONF_TAGS_ATTRIBUTES = 'tags_attributes' CONF_COMPONENT_CONFIG = 'component_config' CONF_COMPONENT_CONFIG_GLOB = 'component_config_glob' CONF_COMPONENT_CONFIG_DOMAIN = 'component_config_domain' +CONF_RETRY_COUNT = 'max_retries' +CONF_RETRY_QUEUE = 'retry_queue_limit' DEFAULT_DATABASE = 'home_assistant' DEFAULT_VERIFY_SSL = True @@ -58,6 +63,8 @@ CONFIG_SCHEMA = vol.Schema({ vol.Optional(CONF_DB_NAME, default=DEFAULT_DATABASE): cv.string, vol.Optional(CONF_PORT): cv.port, vol.Optional(CONF_SSL): cv.boolean, + vol.Optional(CONF_RETRY_COUNT, default=0): cv.positive_int, + vol.Optional(CONF_RETRY_QUEUE, default=20): cv.positive_int, vol.Optional(CONF_DEFAULT_MEASUREMENT): cv.string, vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string, vol.Optional(CONF_TAGS, default={}): @@ -119,6 +126,8 @@ def setup(hass, config): conf[CONF_COMPONENT_CONFIG], conf[CONF_COMPONENT_CONFIG_DOMAIN], conf[CONF_COMPONENT_CONFIG_GLOB]) + max_tries = conf.get(CONF_RETRY_COUNT) + queue_limit = conf.get(CONF_RETRY_QUEUE) try: influx = InfluxDBClient(**kwargs) @@ -213,6 +222,11 @@ def setup(hass, config): json_body[0]['tags'].update(tags) + _write_data(json_body) + + @RetryOnError(hass, retry_limit=max_tries, retry_delay=20, + queue_limit=queue_limit) + def _write_data(json_body): try: influx.write_points(json_body) except exceptions.InfluxDBClientError: @@ -221,3 +235,79 @@ def setup(hass, config): hass.bus.listen(EVENT_STATE_CHANGED, influx_event_listener) return True + + +class RetryOnError(object): + """A class for retrying a failed task a certain amount of tries. + + This method decorator makes a method retrying on errors. If there was an + uncaught exception, it schedules another try to execute the task after a + retry delay. It does this up to the maximum number of retries. + + It can be used for all probable "self-healing" problems like network + outages. The task will be rescheduled using HAs scheduling mechanism. + + It takes a Hass instance, a maximum number of retries and a retry delay + in seconds as arguments. + + The queue limit defines the maximum number of calls that are allowed to + be queued at a time. If this number is reached, every new call discards + an old one. + """ + + def __init__(self, hass, retry_limit=0, retry_delay=20, queue_limit=100): + """Initialize the decorator.""" + self.hass = hass + self.retry_limit = retry_limit + self.retry_delay = timedelta(seconds=retry_delay) + self.queue_limit = queue_limit + + def __call__(self, method): + """Decorate the target method.""" + from homeassistant.helpers.event import track_point_in_utc_time + + @wraps(method) + def wrapper(*args, **kwargs): + """Wrapped method.""" + # pylint: disable=protected-access + if not hasattr(wrapper, "_retry_queue"): + wrapper._retry_queue = [] + + def scheduled(retry=0, untrack=None, event=None): + """Call the target method. + + It is called directly at the first time and then called + scheduled within the Hass mainloop. + """ + if untrack is not None: + wrapper._retry_queue.remove(untrack) + + # pylint: disable=broad-except + try: + method(*args, **kwargs) + except Exception as ex: + if retry == self.retry_limit: + raise + if len(wrapper._retry_queue) >= self.queue_limit: + last = wrapper._retry_queue.pop(0) + if 'remove' in last: + func = last['remove'] + func() + if 'exc' in last: + _LOGGER.error( + "Retry queue overflow, drop oldest entry: %s", + str(last['exc'])) + + target = utcnow() + self.retry_delay + tracking = {'target': target} + remove = track_point_in_utc_time(self.hass, + partial(scheduled, + retry + 1, + tracking), + target) + tracking['remove'] = remove + tracking["exc"] = ex + wrapper._retry_queue.append(tracking) + + scheduled() + return wrapper diff --git a/tests/components/test_influxdb.py b/tests/components/test_influxdb.py index 6c52663051c..d768136592e 100644 --- a/tests/components/test_influxdb.py +++ b/tests/components/test_influxdb.py @@ -3,8 +3,13 @@ import unittest import datetime from unittest import mock +from datetime import timedelta +from unittest.mock import MagicMock + import influxdb as influx_client +from homeassistant.util import dt as dt_util +from homeassistant import core as ha from homeassistant.setup import setup_component import homeassistant.components.influxdb as influxdb from homeassistant.const import EVENT_STATE_CHANGED, STATE_OFF, STATE_ON, \ @@ -36,6 +41,7 @@ class TestInfluxDB(unittest.TestCase): 'database': 'db', 'username': 'user', 'password': 'password', + 'max_retries': 4, 'ssl': 'False', 'verify_ssl': 'False', } @@ -91,7 +97,7 @@ class TestInfluxDB(unittest.TestCase): influx_client.exceptions.InfluxDBClientError('fake') assert not setup_component(self.hass, influxdb.DOMAIN, config) - def _setup(self): + def _setup(self, **kwargs): """Setup the client.""" config = { 'influxdb': { @@ -104,6 +110,7 @@ class TestInfluxDB(unittest.TestCase): } } } + config['influxdb'].update(kwargs) assert setup_component(self.hass, influxdb.DOMAIN, config) self.handler_method = self.hass.bus.listen.call_args_list[0][0][1] @@ -649,3 +656,164 @@ class TestInfluxDB(unittest.TestCase): mock.call(body) ) mock_client.return_value.write_points.reset_mock() + + def test_scheduled_write(self, mock_client): + """Test the event listener to retry after write failures.""" + self._setup(max_retries=1) + + state = mock.MagicMock( + state=1, domain='fake', entity_id='entity.id', object_id='entity', + attributes={}) + event = mock.MagicMock(data={'new_state': state}, time_fired=12345) + mock_client.return_value.write_points.side_effect = \ + IOError('foo') + + start = dt_util.utcnow() + + self.handler_method(event) + json_data = mock_client.return_value.write_points.call_args[0][0] + self.assertEqual(mock_client.return_value.write_points.call_count, 1) + + shifted_time = start + (timedelta(seconds=20 + 1)) + self.hass.bus.fire(ha.EVENT_TIME_CHANGED, + {ha.ATTR_NOW: shifted_time}) + self.hass.block_till_done() + self.assertEqual(mock_client.return_value.write_points.call_count, 2) + mock_client.return_value.write_points.assert_called_with(json_data) + + shifted_time = shifted_time + (timedelta(seconds=20 + 1)) + self.hass.bus.fire(ha.EVENT_TIME_CHANGED, + {ha.ATTR_NOW: shifted_time}) + self.hass.block_till_done() + self.assertEqual(mock_client.return_value.write_points.call_count, 2) + + +class TestRetryOnErrorDecorator(unittest.TestCase): + """Test the RetryOnError decorator.""" + + def setUp(self): + """Setup things to be run when tests are started.""" + self.hass = get_test_home_assistant() + + def tearDown(self): + """Clear data.""" + self.hass.stop() + + def test_no_retry(self): + """Test that it does not retry if configured.""" + mock_method = MagicMock() + wrapped = influxdb.RetryOnError(self.hass)(mock_method) + wrapped(1, 2, test=3) + self.assertEqual(mock_method.call_count, 1) + mock_method.assert_called_with(1, 2, test=3) + + mock_method.side_effect = Exception() + self.assertRaises(Exception, wrapped, 1, 2, test=3) + self.assertEqual(mock_method.call_count, 2) + mock_method.assert_called_with(1, 2, test=3) + + def test_single_retry(self): + """Test that retry stops after a single try if configured.""" + mock_method = MagicMock() + retryer = influxdb.RetryOnError(self.hass, retry_limit=1) + wrapped = retryer(mock_method) + wrapped(1, 2, test=3) + self.assertEqual(mock_method.call_count, 1) + mock_method.assert_called_with(1, 2, test=3) + + start = dt_util.utcnow() + shifted_time = start + (timedelta(seconds=20 + 1)) + self.hass.bus.fire(ha.EVENT_TIME_CHANGED, + {ha.ATTR_NOW: shifted_time}) + self.hass.block_till_done() + self.assertEqual(mock_method.call_count, 1) + + mock_method.side_effect = Exception() + wrapped(1, 2, test=3) + self.assertEqual(mock_method.call_count, 2) + mock_method.assert_called_with(1, 2, test=3) + + for cnt in range(3): + start = dt_util.utcnow() + shifted_time = start + (timedelta(seconds=20 + 1)) + self.hass.bus.fire(ha.EVENT_TIME_CHANGED, + {ha.ATTR_NOW: shifted_time}) + self.hass.block_till_done() + self.assertEqual(mock_method.call_count, 3) + mock_method.assert_called_with(1, 2, test=3) + + def test_multi_retry(self): + """Test that multiple retries work.""" + mock_method = MagicMock() + retryer = influxdb.RetryOnError(self.hass, retry_limit=4) + wrapped = retryer(mock_method) + mock_method.side_effect = Exception() + + wrapped(1, 2, test=3) + self.assertEqual(mock_method.call_count, 1) + mock_method.assert_called_with(1, 2, test=3) + + for cnt in range(3): + start = dt_util.utcnow() + shifted_time = start + (timedelta(seconds=20 + 1)) + self.hass.bus.fire(ha.EVENT_TIME_CHANGED, + {ha.ATTR_NOW: shifted_time}) + self.hass.block_till_done() + self.assertEqual(mock_method.call_count, cnt + 2) + mock_method.assert_called_with(1, 2, test=3) + + def test_max_queue(self): + """Test the maximum queue length.""" + # make a wrapped method + mock_method = MagicMock() + retryer = influxdb.RetryOnError( + self.hass, retry_limit=4, queue_limit=3) + wrapped = retryer(mock_method) + mock_method.side_effect = Exception() + + # call it once, call fails, queue fills to 1 + wrapped(1, 2, test=3) + self.assertEqual(mock_method.call_count, 1) + mock_method.assert_called_with(1, 2, test=3) + self.assertEqual(len(wrapped._retry_queue), 1) + + # two more calls that failed. queue is 3 + wrapped(1, 2, test=3) + wrapped(1, 2, test=3) + self.assertEqual(mock_method.call_count, 3) + self.assertEqual(len(wrapped._retry_queue), 3) + + # another call, queue gets limited to 3 + wrapped(1, 2, test=3) + self.assertEqual(mock_method.call_count, 4) + self.assertEqual(len(wrapped._retry_queue), 3) + + # time passes + start = dt_util.utcnow() + shifted_time = start + (timedelta(seconds=20 + 1)) + self.hass.bus.fire(ha.EVENT_TIME_CHANGED, + {ha.ATTR_NOW: shifted_time}) + self.hass.block_till_done() + + # only the three queued calls where repeated + self.assertEqual(mock_method.call_count, 7) + self.assertEqual(len(wrapped._retry_queue), 3) + + # another call, queue stays limited + wrapped(1, 2, test=3) + self.assertEqual(mock_method.call_count, 8) + self.assertEqual(len(wrapped._retry_queue), 3) + + # disable the side effect + mock_method.side_effect = None + + # time passes, all calls should succeed + start = dt_util.utcnow() + shifted_time = start + (timedelta(seconds=20 + 1)) + self.hass.bus.fire(ha.EVENT_TIME_CHANGED, + {ha.ATTR_NOW: shifted_time}) + self.hass.block_till_done() + + # three queued calls succeeded, queue empty. + self.assertEqual(mock_method.call_count, 11) + self.assertEqual(len(wrapped._retry_queue), 0)