Avoid influxdb filling connection pool (#12182)

* Add a processing queue to influxdb

* Updates after reviews

* Remove lint

* Move retry loop to thread class

* Move constant calculation out of loop

* Deprecate retry_queue_limit
This commit is contained in:
Anders Melchiorsen 2018-02-08 12:25:26 +01:00 committed by Pascal Vizeli
parent 702b1be985
commit 6265d1b747
2 changed files with 128 additions and 239 deletions

View File

@ -4,10 +4,11 @@ A component which allows you to send data to an Influx database.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/influxdb/
"""
from datetime import timedelta
from functools import partial, wraps
import logging
import re
import queue
import threading
import time
import requests.exceptions
import voluptuous as vol
@ -15,11 +16,11 @@ import voluptuous as vol
from homeassistant.const import (
CONF_DOMAINS, CONF_ENTITIES, CONF_EXCLUDE, CONF_HOST, CONF_INCLUDE,
CONF_PASSWORD, CONF_PORT, CONF_SSL, CONF_USERNAME, CONF_VERIFY_SSL,
EVENT_STATE_CHANGED, STATE_UNAVAILABLE, STATE_UNKNOWN)
EVENT_STATE_CHANGED, EVENT_HOMEASSISTANT_STOP, STATE_UNAVAILABLE,
STATE_UNKNOWN)
from homeassistant.helpers import state as state_helper
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_values import EntityValues
from homeassistant.util import utcnow
REQUIREMENTS = ['influxdb==5.0.0']
@ -41,13 +42,15 @@ DEFAULT_VERIFY_SSL = True
DOMAIN = 'influxdb'
TIMEOUT = 5
RETRY_DELAY = 20
QUEUE_BACKLOG_SECONDS = 10
COMPONENT_CONFIG_SCHEMA_ENTRY = vol.Schema({
vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string,
})
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
DOMAIN: vol.All(cv.deprecated(CONF_RETRY_QUEUE), vol.Schema({
vol.Optional(CONF_HOST): cv.string,
vol.Inclusive(CONF_USERNAME, 'authentication'): cv.string,
vol.Inclusive(CONF_PASSWORD, 'authentication'): cv.string,
@ -79,7 +82,7 @@ CONFIG_SCHEMA = vol.Schema({
vol.Schema({cv.string: COMPONENT_CONFIG_SCHEMA_ENTRY}),
vol.Optional(CONF_COMPONENT_CONFIG_DOMAIN, default={}):
vol.Schema({cv.string: COMPONENT_CONFIG_SCHEMA_ENTRY}),
}),
})),
}, extra=vol.ALLOW_EXTRA)
RE_DIGIT_TAIL = re.compile(r'^[^\.]*\d+\.?\d+[^\.]*$')
@ -128,7 +131,6 @@ def setup(hass, config):
conf[CONF_COMPONENT_CONFIG_DOMAIN],
conf[CONF_COMPONENT_CONFIG_GLOB])
max_tries = conf.get(CONF_RETRY_COUNT)
queue_limit = conf.get(CONF_RETRY_QUEUE)
try:
influx = InfluxDBClient(**kwargs)
@ -141,18 +143,18 @@ def setup(hass, config):
"READ/WRITE", exc)
return False
def influx_event_listener(event):
"""Listen for new messages on the bus and sends them to Influx."""
def influx_handle_event(event):
"""Send an event to Influx."""
state = event.data.get('new_state')
if state is None or state.state in (
STATE_UNKNOWN, '', STATE_UNAVAILABLE) or \
state.entity_id in blacklist_e or state.domain in blacklist_d:
return
return True
try:
if (whitelist_e and state.entity_id not in whitelist_e) or \
(whitelist_d and state.domain not in whitelist_d):
return
return True
_include_state = _include_value = False
@ -222,91 +224,78 @@ def setup(hass, config):
json_body[0]['tags'].update(tags)
_write_data(json_body)
@RetryOnError(hass, retry_limit=max_tries, retry_delay=20,
queue_limit=queue_limit)
def _write_data(json_body):
"""Write the data."""
try:
influx.write_points(json_body)
except exceptions.InfluxDBClientError:
_LOGGER.exception("Error saving event %s to InfluxDB", json_body)
return True
except (exceptions.InfluxDBClientError, IOError):
return False
hass.bus.listen(EVENT_STATE_CHANGED, influx_event_listener)
instance = hass.data[DOMAIN] = InfluxThread(
hass, influx_handle_event, max_tries)
instance.start()
def shutdown(event):
"""Shut down the thread."""
instance.queue.put(None)
instance.join()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
return True
class RetryOnError(object):
"""A class for retrying a failed task a certain amount of tries.
class InfluxThread(threading.Thread):
"""A threaded event handler class."""
This method decorator makes a method retrying on errors. If there was an
uncaught exception, it schedules another try to execute the task after a
retry delay. It does this up to the maximum number of retries.
def __init__(self, hass, event_handler, max_tries):
"""Initialize the listener."""
threading.Thread.__init__(self, name='InfluxDB')
self.queue = queue.Queue()
self.event_handler = event_handler
self.max_tries = max_tries
hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener)
It can be used for all probable "self-healing" problems like network
outages. The task will be rescheduled using HAs scheduling mechanism.
def _event_listener(self, event):
"""Listen for new messages on the bus and queue them for Influx."""
item = (time.monotonic(), event)
self.queue.put(item)
It takes a Hass instance, a maximum number of retries and a retry delay
in seconds as arguments.
def run(self):
"""Process incoming events."""
queue_seconds = QUEUE_BACKLOG_SECONDS + self.max_tries*RETRY_DELAY
The queue limit defines the maximum number of calls that are allowed to
be queued at a time. If this number is reached, every new call discards
an old one.
"""
write_error = False
dropped = False
def __init__(self, hass, retry_limit=0, retry_delay=20, queue_limit=100):
"""Initialize the decorator."""
self.hass = hass
self.retry_limit = retry_limit
self.retry_delay = timedelta(seconds=retry_delay)
self.queue_limit = queue_limit
while True:
item = self.queue.get()
def __call__(self, method):
"""Decorate the target method."""
from homeassistant.helpers.event import track_point_in_utc_time
if item is None:
self.queue.task_done()
return
@wraps(method)
def wrapper(*args, **kwargs):
"""Wrap method."""
# pylint: disable=protected-access
if not hasattr(wrapper, "_retry_queue"):
wrapper._retry_queue = []
timestamp, event = item
age = time.monotonic() - timestamp
def scheduled(retry=0, untrack=None, event=None):
"""Call the target method.
if age < queue_seconds:
for retry in range(self.max_tries+1):
if self.event_handler(event):
if write_error:
_LOGGER.error("Resumed writing to InfluxDB")
write_error = False
dropped = False
break
elif retry < self.max_tries:
time.sleep(RETRY_DELAY)
elif not write_error:
_LOGGER.error("Error writing to InfluxDB")
write_error = True
elif not dropped:
_LOGGER.warning("Dropping old events to catch up")
dropped = True
It is called directly at the first time and then called
scheduled within the Hass mainloop.
"""
if untrack is not None:
wrapper._retry_queue.remove(untrack)
self.queue.task_done()
# pylint: disable=broad-except
try:
method(*args, **kwargs)
except Exception as ex:
if retry == self.retry_limit:
raise
if len(wrapper._retry_queue) >= self.queue_limit:
last = wrapper._retry_queue.pop(0)
if 'remove' in last:
func = last['remove']
func()
if 'exc' in last:
_LOGGER.error(
"Retry queue overflow, drop oldest entry: %s",
str(last['exc']))
target = utcnow() + self.retry_delay
tracking = {'target': target}
remove = track_point_in_utc_time(
self.hass, partial(scheduled, retry + 1, tracking),
target)
tracking['remove'] = remove
tracking["exc"] = ex
wrapper._retry_queue.append(tracking)
scheduled()
return wrapper
def block_till_done(self):
"""Block till all events processed."""
self.queue.join()

View File

@ -1,15 +1,10 @@
"""The tests for the InfluxDB component."""
import unittest
import datetime
import unittest
from unittest import mock
from datetime import timedelta
from unittest.mock import MagicMock
import influxdb as influx_client
from homeassistant.util import dt as dt_util
from homeassistant import core as ha
from homeassistant.setup import setup_component
import homeassistant.components.influxdb as influxdb
from homeassistant.const import EVENT_STATE_CHANGED, STATE_OFF, STATE_ON, \
@ -169,6 +164,8 @@ class TestInfluxDB(unittest.TestCase):
body[0]['fields']['value'] = out[1]
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
self.assertEqual(
mock_client.return_value.write_points.call_count, 1
)
@ -203,6 +200,7 @@ class TestInfluxDB(unittest.TestCase):
},
}]
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
self.assertEqual(
mock_client.return_value.write_points.call_count, 1
)
@ -212,18 +210,6 @@ class TestInfluxDB(unittest.TestCase):
)
mock_client.return_value.write_points.reset_mock()
def test_event_listener_fail_write(self, mock_client):
"""Test the event listener for write failures."""
self._setup()
state = mock.MagicMock(
state=1, domain='fake', entity_id='fake.entity-id',
object_id='entity', attributes={})
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
mock_client.return_value.write_points.side_effect = \
influx_client.exceptions.InfluxDBClientError('foo')
self.handler_method(event)
def test_event_listener_states(self, mock_client):
"""Test the event listener against ignored states."""
self._setup()
@ -245,6 +231,7 @@ class TestInfluxDB(unittest.TestCase):
},
}]
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
if state_state == 1:
self.assertEqual(
mock_client.return_value.write_points.call_count, 1
@ -278,6 +265,7 @@ class TestInfluxDB(unittest.TestCase):
},
}]
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
if entity_id == 'ok':
self.assertEqual(
mock_client.return_value.write_points.call_count, 1
@ -312,6 +300,7 @@ class TestInfluxDB(unittest.TestCase):
},
}]
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
if domain == 'ok':
self.assertEqual(
mock_client.return_value.write_points.call_count, 1
@ -356,6 +345,7 @@ class TestInfluxDB(unittest.TestCase):
},
}]
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
if entity_id == 'included':
self.assertEqual(
mock_client.return_value.write_points.call_count, 1
@ -401,6 +391,7 @@ class TestInfluxDB(unittest.TestCase):
},
}]
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
if domain == 'fake':
self.assertEqual(
mock_client.return_value.write_points.call_count, 1
@ -456,6 +447,7 @@ class TestInfluxDB(unittest.TestCase):
body[0]['fields']['value'] = out[1]
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
self.assertEqual(
mock_client.return_value.write_points.call_count, 1
)
@ -498,6 +490,7 @@ class TestInfluxDB(unittest.TestCase):
},
}]
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
if entity_id == 'ok':
self.assertEqual(
mock_client.return_value.write_points.call_count, 1
@ -543,6 +536,7 @@ class TestInfluxDB(unittest.TestCase):
},
}]
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
self.assertEqual(
mock_client.return_value.write_points.call_count, 1
)
@ -588,6 +582,7 @@ class TestInfluxDB(unittest.TestCase):
},
}]
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
self.assertEqual(
mock_client.return_value.write_points.call_count, 1
)
@ -648,6 +643,7 @@ class TestInfluxDB(unittest.TestCase):
},
}]
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
self.assertEqual(
mock_client.return_value.write_points.call_count, 1
)
@ -659,7 +655,16 @@ class TestInfluxDB(unittest.TestCase):
def test_scheduled_write(self, mock_client):
"""Test the event listener to retry after write failures."""
self._setup(max_retries=1)
config = {
'influxdb': {
'host': 'host',
'username': 'user',
'password': 'pass',
'max_retries': 1
}
}
assert setup_component(self.hass, influxdb.DOMAIN, config)
self.handler_method = self.hass.bus.listen.call_args_list[0][0][1]
state = mock.MagicMock(
state=1, domain='fake', entity_id='entity.id', object_id='entity',
@ -668,152 +673,47 @@ class TestInfluxDB(unittest.TestCase):
mock_client.return_value.write_points.side_effect = \
IOError('foo')
start = dt_util.utcnow()
self.handler_method(event)
# Write fails
with mock.patch.object(influxdb.time, 'sleep') as mock_sleep:
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
assert mock_sleep.called
json_data = mock_client.return_value.write_points.call_args[0][0]
self.assertEqual(mock_client.return_value.write_points.call_count, 1)
shifted_time = start + (timedelta(seconds=20 + 1))
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: shifted_time})
self.hass.block_till_done()
self.assertEqual(mock_client.return_value.write_points.call_count, 2)
mock_client.return_value.write_points.assert_called_with(json_data)
shifted_time = shifted_time + (timedelta(seconds=20 + 1))
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: shifted_time})
self.hass.block_till_done()
self.assertEqual(mock_client.return_value.write_points.call_count, 2)
# Write works again
mock_client.return_value.write_points.side_effect = None
with mock.patch.object(influxdb.time, 'sleep') as mock_sleep:
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
assert not mock_sleep.called
self.assertEqual(mock_client.return_value.write_points.call_count, 3)
def test_queue_backlog_full(self, mock_client):
"""Test the event listener to drop old events."""
self._setup()
class TestRetryOnErrorDecorator(unittest.TestCase):
"""Test the RetryOnError decorator."""
state = mock.MagicMock(
state=1, domain='fake', entity_id='entity.id', object_id='entity',
attributes={})
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
def setUp(self):
"""Setup things to be run when tests are started."""
self.hass = get_test_home_assistant()
monotonic_time = 0
def tearDown(self):
"""Clear data."""
self.hass.stop()
def fast_monotonic():
"""Monotonic time that ticks fast enough to cause a timeout."""
nonlocal monotonic_time
monotonic_time += 60
return monotonic_time
def test_no_retry(self):
"""Test that it does not retry if configured."""
mock_method = MagicMock()
wrapped = influxdb.RetryOnError(self.hass)(mock_method)
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 1)
mock_method.assert_called_with(1, 2, test=3)
with mock.patch('homeassistant.components.influxdb.time.monotonic',
new=fast_monotonic):
self.handler_method(event)
self.hass.data[influxdb.DOMAIN].block_till_done()
mock_method.side_effect = Exception()
self.assertRaises(Exception, wrapped, 1, 2, test=3)
self.assertEqual(mock_method.call_count, 2)
mock_method.assert_called_with(1, 2, test=3)
self.assertEqual(
mock_client.return_value.write_points.call_count, 0
)
def test_single_retry(self):
"""Test that retry stops after a single try if configured."""
mock_method = MagicMock()
retryer = influxdb.RetryOnError(self.hass, retry_limit=1)
wrapped = retryer(mock_method)
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 1)
mock_method.assert_called_with(1, 2, test=3)
start = dt_util.utcnow()
shifted_time = start + (timedelta(seconds=20 + 1))
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: shifted_time})
self.hass.block_till_done()
self.assertEqual(mock_method.call_count, 1)
mock_method.side_effect = Exception()
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 2)
mock_method.assert_called_with(1, 2, test=3)
for _ in range(3):
start = dt_util.utcnow()
shifted_time = start + (timedelta(seconds=20 + 1))
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: shifted_time})
self.hass.block_till_done()
self.assertEqual(mock_method.call_count, 3)
mock_method.assert_called_with(1, 2, test=3)
def test_multi_retry(self):
"""Test that multiple retries work."""
mock_method = MagicMock()
retryer = influxdb.RetryOnError(self.hass, retry_limit=4)
wrapped = retryer(mock_method)
mock_method.side_effect = Exception()
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 1)
mock_method.assert_called_with(1, 2, test=3)
for cnt in range(3):
start = dt_util.utcnow()
shifted_time = start + (timedelta(seconds=20 + 1))
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: shifted_time})
self.hass.block_till_done()
self.assertEqual(mock_method.call_count, cnt + 2)
mock_method.assert_called_with(1, 2, test=3)
def test_max_queue(self):
"""Test the maximum queue length."""
# make a wrapped method
mock_method = MagicMock()
retryer = influxdb.RetryOnError(
self.hass, retry_limit=4, queue_limit=3)
wrapped = retryer(mock_method)
mock_method.side_effect = Exception()
# call it once, call fails, queue fills to 1
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 1)
mock_method.assert_called_with(1, 2, test=3)
self.assertEqual(len(wrapped._retry_queue), 1)
# two more calls that failed. queue is 3
wrapped(1, 2, test=3)
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 3)
self.assertEqual(len(wrapped._retry_queue), 3)
# another call, queue gets limited to 3
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 4)
self.assertEqual(len(wrapped._retry_queue), 3)
# time passes
start = dt_util.utcnow()
shifted_time = start + (timedelta(seconds=20 + 1))
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: shifted_time})
self.hass.block_till_done()
# only the three queued calls where repeated
self.assertEqual(mock_method.call_count, 7)
self.assertEqual(len(wrapped._retry_queue), 3)
# another call, queue stays limited
wrapped(1, 2, test=3)
self.assertEqual(mock_method.call_count, 8)
self.assertEqual(len(wrapped._retry_queue), 3)
# disable the side effect
mock_method.side_effect = None
# time passes, all calls should succeed
start = dt_util.utcnow()
shifted_time = start + (timedelta(seconds=20 + 1))
self.hass.bus.fire(ha.EVENT_TIME_CHANGED,
{ha.ATTR_NOW: shifted_time})
self.hass.block_till_done()
# three queued calls succeeded, queue empty.
self.assertEqual(mock_method.call_count, 11)
self.assertEqual(len(wrapped._retry_queue), 0)
mock_client.return_value.write_points.reset_mock()