diff --git a/homeassistant/components/influxdb.py b/homeassistant/components/influxdb.py index 526b8057ce1..08c94d54361 100644 --- a/homeassistant/components/influxdb.py +++ b/homeassistant/components/influxdb.py @@ -43,7 +43,10 @@ DOMAIN = 'influxdb' TIMEOUT = 5 RETRY_DELAY = 20 -QUEUE_BACKLOG_SECONDS = 10 +QUEUE_BACKLOG_SECONDS = 30 + +BATCH_TIMEOUT = 1 +BATCH_BUFFER_SIZE = 100 COMPONENT_CONFIG_SCHEMA_ENTRY = vol.Schema({ vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string, @@ -143,18 +146,18 @@ def setup(hass, config): "READ/WRITE", exc) return False - def influx_handle_event(event): - """Send an event to Influx.""" + def event_to_json(event): + """Add an event to the outgoing Influx list.""" state = event.data.get('new_state') if state is None or state.state in ( STATE_UNKNOWN, '', STATE_UNAVAILABLE) or \ state.entity_id in blacklist_e or state.domain in blacklist_d: - return True + return try: if (whitelist_e and state.entity_id not in whitelist_e) or \ (whitelist_d and state.domain not in whitelist_d): - return True + return _include_state = _include_value = False @@ -183,55 +186,48 @@ def setup(hass, config): else: include_uom = False - json_body = [ - { - 'measurement': measurement, - 'tags': { - 'domain': state.domain, - 'entity_id': state.object_id, - }, - 'time': event.time_fired, - 'fields': { - } - } - ] + json = { + 'measurement': measurement, + 'tags': { + 'domain': state.domain, + 'entity_id': state.object_id, + }, + 'time': event.time_fired, + 'fields': {} + } if _include_state: - json_body[0]['fields']['state'] = state.state + json['fields']['state'] = state.state if _include_value: - json_body[0]['fields']['value'] = _state_as_value + json['fields']['value'] = _state_as_value for key, value in state.attributes.items(): if key in tags_attributes: - json_body[0]['tags'][key] = value + json['tags'][key] = value elif key != 'unit_of_measurement' or include_uom: # If the key is already in fields - if key in json_body[0]['fields']: + if key in json['fields']: key = key + "_" # Prevent column data errors in influxDB. # For each value we try to cast it as float # But if we can not do it we store the value # as string add "_str" postfix to the field key try: - json_body[0]['fields'][key] = float(value) + json['fields'][key] = float(value) except (ValueError, TypeError): new_key = "{}_str".format(key) new_value = str(value) - json_body[0]['fields'][new_key] = new_value + json['fields'][new_key] = new_value if RE_DIGIT_TAIL.match(new_value): - json_body[0]['fields'][key] = float( + json['fields'][key] = float( RE_DECIMAL.sub('', new_value)) - json_body[0]['tags'].update(tags) + json['tags'].update(tags) - try: - influx.write_points(json_body) - return True - except (exceptions.InfluxDBClientError, IOError): - return False + return json instance = hass.data[DOMAIN] = InfluxThread( - hass, influx_handle_event, max_tries) + hass, influx, event_to_json, max_tries) instance.start() def shutdown(event): @@ -247,12 +243,15 @@ def setup(hass, config): class InfluxThread(threading.Thread): """A threaded event handler class.""" - def __init__(self, hass, event_handler, max_tries): + def __init__(self, hass, influx, event_to_json, max_tries): """Initialize the listener.""" threading.Thread.__init__(self, name='InfluxDB') self.queue = queue.Queue() - self.event_handler = event_handler + self.influx = influx + self.event_to_json = event_to_json self.max_tries = max_tries + self.write_errors = 0 + self.shutdown = False hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener) def _event_listener(self, event): @@ -260,41 +259,77 @@ class InfluxThread(threading.Thread): item = (time.monotonic(), event) self.queue.put(item) - def run(self): - """Process incoming events.""" + @staticmethod + def batch_timeout(): + """Return number of seconds to wait for more events.""" + return BATCH_TIMEOUT + + def get_events_json(self): + """Return a batch of events formatted for writing.""" queue_seconds = QUEUE_BACKLOG_SECONDS + self.max_tries*RETRY_DELAY - write_error = False - dropped = False + count = 0 + json = [] - while True: - item = self.queue.get() + dropped = 0 - if item is None: + try: + while len(json) < BATCH_BUFFER_SIZE and not self.shutdown: + timeout = None if count == 0 else self.batch_timeout() + item = self.queue.get(timeout=timeout) + count += 1 + + if item is None: + self.shutdown = True + else: + timestamp, event = item + age = time.monotonic() - timestamp + + if age < queue_seconds: + event_json = self.event_to_json(event) + if event_json: + json.append(event_json) + else: + dropped += 1 + + except queue.Empty: + pass + + if dropped: + _LOGGER.warning("Catching up, dropped %d old events", dropped) + + return count, json + + def write_to_influxdb(self, json): + """Write preprocessed events to influxdb, with retry.""" + from influxdb import exceptions + + for retry in range(self.max_tries+1): + try: + self.influx.write_points(json) + + if self.write_errors: + _LOGGER.error("Resumed, lost %d events", self.write_errors) + self.write_errors = 0 + + _LOGGER.debug("Wrote %d events", len(json)) + break + except (exceptions.InfluxDBClientError, IOError): + if retry < self.max_tries: + time.sleep(RETRY_DELAY) + else: + if not self.write_errors: + _LOGGER.exception("Write error") + self.write_errors += len(json) + + def run(self): + """Process incoming events.""" + while not self.shutdown: + count, json = self.get_events_json() + if json: + self.write_to_influxdb(json) + for _ in range(count): self.queue.task_done() - return - - timestamp, event = item - age = time.monotonic() - timestamp - - if age < queue_seconds: - for retry in range(self.max_tries+1): - if self.event_handler(event): - if write_error: - _LOGGER.error("Resumed writing to InfluxDB") - write_error = False - dropped = False - break - elif retry < self.max_tries: - time.sleep(RETRY_DELAY) - elif not write_error: - _LOGGER.error("Error writing to InfluxDB") - write_error = True - elif not dropped: - _LOGGER.warning("Dropping old events to catch up") - dropped = True - - self.queue.task_done() def block_till_done(self): """Block till all events processed.""" diff --git a/tests/components/test_influxdb.py b/tests/components/test_influxdb.py index 4d12e436c02..feaa0f35622 100644 --- a/tests/components/test_influxdb.py +++ b/tests/components/test_influxdb.py @@ -14,6 +14,9 @@ from tests.common import get_test_home_assistant @mock.patch('influxdb.InfluxDBClient') +@mock.patch( + 'homeassistant.components.influxdb.InfluxThread.batch_timeout', + mock.Mock(return_value=0)) class TestInfluxDB(unittest.TestCase): """Test the InfluxDB component."""