From 6265d1b747c2953136a618b8d999baa4c0531727 Mon Sep 17 00:00:00 2001 From: Anders Melchiorsen Date: Thu, 8 Feb 2018 12:25:26 +0100 Subject: [PATCH] Avoid influxdb filling connection pool (#12182) * Add a processing queue to influxdb * Updates after reviews * Remove lint * Move retry loop to thread class * Move constant calculation out of loop * Deprecate retry_queue_limit --- homeassistant/components/influxdb.py | 153 +++++++++---------- tests/components/test_influxdb.py | 214 +++++++-------------------- 2 files changed, 128 insertions(+), 239 deletions(-) diff --git a/homeassistant/components/influxdb.py b/homeassistant/components/influxdb.py index 30a47828cc7..526b8057ce1 100644 --- a/homeassistant/components/influxdb.py +++ b/homeassistant/components/influxdb.py @@ -4,10 +4,11 @@ A component which allows you to send data to an Influx database. For more details about this component, please refer to the documentation at https://home-assistant.io/components/influxdb/ """ -from datetime import timedelta -from functools import partial, wraps import logging import re +import queue +import threading +import time import requests.exceptions import voluptuous as vol @@ -15,11 +16,11 @@ import voluptuous as vol from homeassistant.const import ( CONF_DOMAINS, CONF_ENTITIES, CONF_EXCLUDE, CONF_HOST, CONF_INCLUDE, CONF_PASSWORD, CONF_PORT, CONF_SSL, CONF_USERNAME, CONF_VERIFY_SSL, - EVENT_STATE_CHANGED, STATE_UNAVAILABLE, STATE_UNKNOWN) + EVENT_STATE_CHANGED, EVENT_HOMEASSISTANT_STOP, STATE_UNAVAILABLE, + STATE_UNKNOWN) from homeassistant.helpers import state as state_helper import homeassistant.helpers.config_validation as cv from homeassistant.helpers.entity_values import EntityValues -from homeassistant.util import utcnow REQUIREMENTS = ['influxdb==5.0.0'] @@ -41,13 +42,15 @@ DEFAULT_VERIFY_SSL = True DOMAIN = 'influxdb' TIMEOUT = 5 +RETRY_DELAY = 20 +QUEUE_BACKLOG_SECONDS = 10 COMPONENT_CONFIG_SCHEMA_ENTRY = vol.Schema({ vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string, }) CONFIG_SCHEMA = vol.Schema({ - DOMAIN: vol.Schema({ + DOMAIN: vol.All(cv.deprecated(CONF_RETRY_QUEUE), vol.Schema({ vol.Optional(CONF_HOST): cv.string, vol.Inclusive(CONF_USERNAME, 'authentication'): cv.string, vol.Inclusive(CONF_PASSWORD, 'authentication'): cv.string, @@ -79,7 +82,7 @@ CONFIG_SCHEMA = vol.Schema({ vol.Schema({cv.string: COMPONENT_CONFIG_SCHEMA_ENTRY}), vol.Optional(CONF_COMPONENT_CONFIG_DOMAIN, default={}): vol.Schema({cv.string: COMPONENT_CONFIG_SCHEMA_ENTRY}), - }), + })), }, extra=vol.ALLOW_EXTRA) RE_DIGIT_TAIL = re.compile(r'^[^\.]*\d+\.?\d+[^\.]*$') @@ -128,7 +131,6 @@ def setup(hass, config): conf[CONF_COMPONENT_CONFIG_DOMAIN], conf[CONF_COMPONENT_CONFIG_GLOB]) max_tries = conf.get(CONF_RETRY_COUNT) - queue_limit = conf.get(CONF_RETRY_QUEUE) try: influx = InfluxDBClient(**kwargs) @@ -141,18 +143,18 @@ def setup(hass, config): "READ/WRITE", exc) return False - def influx_event_listener(event): - """Listen for new messages on the bus and sends them to Influx.""" + def influx_handle_event(event): + """Send an event to Influx.""" state = event.data.get('new_state') if state is None or state.state in ( STATE_UNKNOWN, '', STATE_UNAVAILABLE) or \ state.entity_id in blacklist_e or state.domain in blacklist_d: - return + return True try: if (whitelist_e and state.entity_id not in whitelist_e) or \ (whitelist_d and state.domain not in whitelist_d): - return + return True _include_state = _include_value = False @@ -222,91 +224,78 @@ def setup(hass, config): json_body[0]['tags'].update(tags) - _write_data(json_body) - - @RetryOnError(hass, retry_limit=max_tries, retry_delay=20, - queue_limit=queue_limit) - def _write_data(json_body): - """Write the data.""" try: influx.write_points(json_body) - except exceptions.InfluxDBClientError: - _LOGGER.exception("Error saving event %s to InfluxDB", json_body) + return True + except (exceptions.InfluxDBClientError, IOError): + return False - hass.bus.listen(EVENT_STATE_CHANGED, influx_event_listener) + instance = hass.data[DOMAIN] = InfluxThread( + hass, influx_handle_event, max_tries) + instance.start() + + def shutdown(event): + """Shut down the thread.""" + instance.queue.put(None) + instance.join() + + hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) return True -class RetryOnError(object): - """A class for retrying a failed task a certain amount of tries. +class InfluxThread(threading.Thread): + """A threaded event handler class.""" - This method decorator makes a method retrying on errors. If there was an - uncaught exception, it schedules another try to execute the task after a - retry delay. It does this up to the maximum number of retries. + def __init__(self, hass, event_handler, max_tries): + """Initialize the listener.""" + threading.Thread.__init__(self, name='InfluxDB') + self.queue = queue.Queue() + self.event_handler = event_handler + self.max_tries = max_tries + hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener) - It can be used for all probable "self-healing" problems like network - outages. The task will be rescheduled using HAs scheduling mechanism. + def _event_listener(self, event): + """Listen for new messages on the bus and queue them for Influx.""" + item = (time.monotonic(), event) + self.queue.put(item) - It takes a Hass instance, a maximum number of retries and a retry delay - in seconds as arguments. + def run(self): + """Process incoming events.""" + queue_seconds = QUEUE_BACKLOG_SECONDS + self.max_tries*RETRY_DELAY - The queue limit defines the maximum number of calls that are allowed to - be queued at a time. If this number is reached, every new call discards - an old one. - """ + write_error = False + dropped = False - def __init__(self, hass, retry_limit=0, retry_delay=20, queue_limit=100): - """Initialize the decorator.""" - self.hass = hass - self.retry_limit = retry_limit - self.retry_delay = timedelta(seconds=retry_delay) - self.queue_limit = queue_limit + while True: + item = self.queue.get() - def __call__(self, method): - """Decorate the target method.""" - from homeassistant.helpers.event import track_point_in_utc_time + if item is None: + self.queue.task_done() + return - @wraps(method) - def wrapper(*args, **kwargs): - """Wrap method.""" - # pylint: disable=protected-access - if not hasattr(wrapper, "_retry_queue"): - wrapper._retry_queue = [] + timestamp, event = item + age = time.monotonic() - timestamp - def scheduled(retry=0, untrack=None, event=None): - """Call the target method. + if age < queue_seconds: + for retry in range(self.max_tries+1): + if self.event_handler(event): + if write_error: + _LOGGER.error("Resumed writing to InfluxDB") + write_error = False + dropped = False + break + elif retry < self.max_tries: + time.sleep(RETRY_DELAY) + elif not write_error: + _LOGGER.error("Error writing to InfluxDB") + write_error = True + elif not dropped: + _LOGGER.warning("Dropping old events to catch up") + dropped = True - It is called directly at the first time and then called - scheduled within the Hass mainloop. - """ - if untrack is not None: - wrapper._retry_queue.remove(untrack) + self.queue.task_done() - # pylint: disable=broad-except - try: - method(*args, **kwargs) - except Exception as ex: - if retry == self.retry_limit: - raise - if len(wrapper._retry_queue) >= self.queue_limit: - last = wrapper._retry_queue.pop(0) - if 'remove' in last: - func = last['remove'] - func() - if 'exc' in last: - _LOGGER.error( - "Retry queue overflow, drop oldest entry: %s", - str(last['exc'])) - - target = utcnow() + self.retry_delay - tracking = {'target': target} - remove = track_point_in_utc_time( - self.hass, partial(scheduled, retry + 1, tracking), - target) - tracking['remove'] = remove - tracking["exc"] = ex - wrapper._retry_queue.append(tracking) - - scheduled() - return wrapper + def block_till_done(self): + """Block till all events processed.""" + self.queue.join() diff --git a/tests/components/test_influxdb.py b/tests/components/test_influxdb.py index 0ec5f973ee4..4d12e436c02 100644 --- a/tests/components/test_influxdb.py +++ b/tests/components/test_influxdb.py @@ -1,15 +1,10 @@ """The tests for the InfluxDB component.""" -import unittest import datetime +import unittest from unittest import mock -from datetime import timedelta -from unittest.mock import MagicMock - import influxdb as influx_client -from homeassistant.util import dt as dt_util -from homeassistant import core as ha from homeassistant.setup import setup_component import homeassistant.components.influxdb as influxdb from homeassistant.const import EVENT_STATE_CHANGED, STATE_OFF, STATE_ON, \ @@ -169,6 +164,8 @@ class TestInfluxDB(unittest.TestCase): body[0]['fields']['value'] = out[1] self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() + self.assertEqual( mock_client.return_value.write_points.call_count, 1 ) @@ -203,6 +200,7 @@ class TestInfluxDB(unittest.TestCase): }, }] self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() self.assertEqual( mock_client.return_value.write_points.call_count, 1 ) @@ -212,18 +210,6 @@ class TestInfluxDB(unittest.TestCase): ) mock_client.return_value.write_points.reset_mock() - def test_event_listener_fail_write(self, mock_client): - """Test the event listener for write failures.""" - self._setup() - - state = mock.MagicMock( - state=1, domain='fake', entity_id='fake.entity-id', - object_id='entity', attributes={}) - event = mock.MagicMock(data={'new_state': state}, time_fired=12345) - mock_client.return_value.write_points.side_effect = \ - influx_client.exceptions.InfluxDBClientError('foo') - self.handler_method(event) - def test_event_listener_states(self, mock_client): """Test the event listener against ignored states.""" self._setup() @@ -245,6 +231,7 @@ class TestInfluxDB(unittest.TestCase): }, }] self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() if state_state == 1: self.assertEqual( mock_client.return_value.write_points.call_count, 1 @@ -278,6 +265,7 @@ class TestInfluxDB(unittest.TestCase): }, }] self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() if entity_id == 'ok': self.assertEqual( mock_client.return_value.write_points.call_count, 1 @@ -312,6 +300,7 @@ class TestInfluxDB(unittest.TestCase): }, }] self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() if domain == 'ok': self.assertEqual( mock_client.return_value.write_points.call_count, 1 @@ -356,6 +345,7 @@ class TestInfluxDB(unittest.TestCase): }, }] self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() if entity_id == 'included': self.assertEqual( mock_client.return_value.write_points.call_count, 1 @@ -401,6 +391,7 @@ class TestInfluxDB(unittest.TestCase): }, }] self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() if domain == 'fake': self.assertEqual( mock_client.return_value.write_points.call_count, 1 @@ -456,6 +447,7 @@ class TestInfluxDB(unittest.TestCase): body[0]['fields']['value'] = out[1] self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() self.assertEqual( mock_client.return_value.write_points.call_count, 1 ) @@ -498,6 +490,7 @@ class TestInfluxDB(unittest.TestCase): }, }] self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() if entity_id == 'ok': self.assertEqual( mock_client.return_value.write_points.call_count, 1 @@ -543,6 +536,7 @@ class TestInfluxDB(unittest.TestCase): }, }] self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() self.assertEqual( mock_client.return_value.write_points.call_count, 1 ) @@ -588,6 +582,7 @@ class TestInfluxDB(unittest.TestCase): }, }] self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() self.assertEqual( mock_client.return_value.write_points.call_count, 1 ) @@ -648,6 +643,7 @@ class TestInfluxDB(unittest.TestCase): }, }] self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() self.assertEqual( mock_client.return_value.write_points.call_count, 1 ) @@ -659,7 +655,16 @@ class TestInfluxDB(unittest.TestCase): def test_scheduled_write(self, mock_client): """Test the event listener to retry after write failures.""" - self._setup(max_retries=1) + config = { + 'influxdb': { + 'host': 'host', + 'username': 'user', + 'password': 'pass', + 'max_retries': 1 + } + } + assert setup_component(self.hass, influxdb.DOMAIN, config) + self.handler_method = self.hass.bus.listen.call_args_list[0][0][1] state = mock.MagicMock( state=1, domain='fake', entity_id='entity.id', object_id='entity', @@ -668,152 +673,47 @@ class TestInfluxDB(unittest.TestCase): mock_client.return_value.write_points.side_effect = \ IOError('foo') - start = dt_util.utcnow() - - self.handler_method(event) + # Write fails + with mock.patch.object(influxdb.time, 'sleep') as mock_sleep: + self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() + assert mock_sleep.called json_data = mock_client.return_value.write_points.call_args[0][0] - self.assertEqual(mock_client.return_value.write_points.call_count, 1) - - shifted_time = start + (timedelta(seconds=20 + 1)) - self.hass.bus.fire(ha.EVENT_TIME_CHANGED, - {ha.ATTR_NOW: shifted_time}) - self.hass.block_till_done() self.assertEqual(mock_client.return_value.write_points.call_count, 2) mock_client.return_value.write_points.assert_called_with(json_data) - shifted_time = shifted_time + (timedelta(seconds=20 + 1)) - self.hass.bus.fire(ha.EVENT_TIME_CHANGED, - {ha.ATTR_NOW: shifted_time}) - self.hass.block_till_done() - self.assertEqual(mock_client.return_value.write_points.call_count, 2) + # Write works again + mock_client.return_value.write_points.side_effect = None + with mock.patch.object(influxdb.time, 'sleep') as mock_sleep: + self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() + assert not mock_sleep.called + self.assertEqual(mock_client.return_value.write_points.call_count, 3) + def test_queue_backlog_full(self, mock_client): + """Test the event listener to drop old events.""" + self._setup() -class TestRetryOnErrorDecorator(unittest.TestCase): - """Test the RetryOnError decorator.""" + state = mock.MagicMock( + state=1, domain='fake', entity_id='entity.id', object_id='entity', + attributes={}) + event = mock.MagicMock(data={'new_state': state}, time_fired=12345) - def setUp(self): - """Setup things to be run when tests are started.""" - self.hass = get_test_home_assistant() + monotonic_time = 0 - def tearDown(self): - """Clear data.""" - self.hass.stop() + def fast_monotonic(): + """Monotonic time that ticks fast enough to cause a timeout.""" + nonlocal monotonic_time + monotonic_time += 60 + return monotonic_time - def test_no_retry(self): - """Test that it does not retry if configured.""" - mock_method = MagicMock() - wrapped = influxdb.RetryOnError(self.hass)(mock_method) - wrapped(1, 2, test=3) - self.assertEqual(mock_method.call_count, 1) - mock_method.assert_called_with(1, 2, test=3) + with mock.patch('homeassistant.components.influxdb.time.monotonic', + new=fast_monotonic): + self.handler_method(event) + self.hass.data[influxdb.DOMAIN].block_till_done() - mock_method.side_effect = Exception() - self.assertRaises(Exception, wrapped, 1, 2, test=3) - self.assertEqual(mock_method.call_count, 2) - mock_method.assert_called_with(1, 2, test=3) + self.assertEqual( + mock_client.return_value.write_points.call_count, 0 + ) - def test_single_retry(self): - """Test that retry stops after a single try if configured.""" - mock_method = MagicMock() - retryer = influxdb.RetryOnError(self.hass, retry_limit=1) - wrapped = retryer(mock_method) - wrapped(1, 2, test=3) - self.assertEqual(mock_method.call_count, 1) - mock_method.assert_called_with(1, 2, test=3) - - start = dt_util.utcnow() - shifted_time = start + (timedelta(seconds=20 + 1)) - self.hass.bus.fire(ha.EVENT_TIME_CHANGED, - {ha.ATTR_NOW: shifted_time}) - self.hass.block_till_done() - self.assertEqual(mock_method.call_count, 1) - - mock_method.side_effect = Exception() - wrapped(1, 2, test=3) - self.assertEqual(mock_method.call_count, 2) - mock_method.assert_called_with(1, 2, test=3) - - for _ in range(3): - start = dt_util.utcnow() - shifted_time = start + (timedelta(seconds=20 + 1)) - self.hass.bus.fire(ha.EVENT_TIME_CHANGED, - {ha.ATTR_NOW: shifted_time}) - self.hass.block_till_done() - self.assertEqual(mock_method.call_count, 3) - mock_method.assert_called_with(1, 2, test=3) - - def test_multi_retry(self): - """Test that multiple retries work.""" - mock_method = MagicMock() - retryer = influxdb.RetryOnError(self.hass, retry_limit=4) - wrapped = retryer(mock_method) - mock_method.side_effect = Exception() - - wrapped(1, 2, test=3) - self.assertEqual(mock_method.call_count, 1) - mock_method.assert_called_with(1, 2, test=3) - - for cnt in range(3): - start = dt_util.utcnow() - shifted_time = start + (timedelta(seconds=20 + 1)) - self.hass.bus.fire(ha.EVENT_TIME_CHANGED, - {ha.ATTR_NOW: shifted_time}) - self.hass.block_till_done() - self.assertEqual(mock_method.call_count, cnt + 2) - mock_method.assert_called_with(1, 2, test=3) - - def test_max_queue(self): - """Test the maximum queue length.""" - # make a wrapped method - mock_method = MagicMock() - retryer = influxdb.RetryOnError( - self.hass, retry_limit=4, queue_limit=3) - wrapped = retryer(mock_method) - mock_method.side_effect = Exception() - - # call it once, call fails, queue fills to 1 - wrapped(1, 2, test=3) - self.assertEqual(mock_method.call_count, 1) - mock_method.assert_called_with(1, 2, test=3) - self.assertEqual(len(wrapped._retry_queue), 1) - - # two more calls that failed. queue is 3 - wrapped(1, 2, test=3) - wrapped(1, 2, test=3) - self.assertEqual(mock_method.call_count, 3) - self.assertEqual(len(wrapped._retry_queue), 3) - - # another call, queue gets limited to 3 - wrapped(1, 2, test=3) - self.assertEqual(mock_method.call_count, 4) - self.assertEqual(len(wrapped._retry_queue), 3) - - # time passes - start = dt_util.utcnow() - shifted_time = start + (timedelta(seconds=20 + 1)) - self.hass.bus.fire(ha.EVENT_TIME_CHANGED, - {ha.ATTR_NOW: shifted_time}) - self.hass.block_till_done() - - # only the three queued calls where repeated - self.assertEqual(mock_method.call_count, 7) - self.assertEqual(len(wrapped._retry_queue), 3) - - # another call, queue stays limited - wrapped(1, 2, test=3) - self.assertEqual(mock_method.call_count, 8) - self.assertEqual(len(wrapped._retry_queue), 3) - - # disable the side effect - mock_method.side_effect = None - - # time passes, all calls should succeed - start = dt_util.utcnow() - shifted_time = start + (timedelta(seconds=20 + 1)) - self.hass.bus.fire(ha.EVENT_TIME_CHANGED, - {ha.ATTR_NOW: shifted_time}) - self.hass.block_till_done() - - # three queued calls succeeded, queue empty. - self.assertEqual(mock_method.call_count, 11) - self.assertEqual(len(wrapped._retry_queue), 0) + mock_client.return_value.write_points.reset_mock()