diff --git a/homeassistant/components/zabbix/__init__.py b/homeassistant/components/zabbix/__init__.py index a1b4327470f..0afe8050d63 100644 --- a/homeassistant/components/zabbix/__init__.py +++ b/homeassistant/components/zabbix/__init__.py @@ -1,8 +1,14 @@ """Support for Zabbix.""" +import json import logging +import math +import queue +import threading +import time +from urllib.error import HTTPError from urllib.parse import urljoin -from pyzabbix import ZabbixAPI, ZabbixAPIException +from pyzabbix import ZabbixAPI, ZabbixAPIException, ZabbixMetric, ZabbixSender import voluptuous as vol from homeassistant.const import ( @@ -11,24 +17,45 @@ from homeassistant.const import ( CONF_PATH, CONF_SSL, CONF_USERNAME, + EVENT_HOMEASSISTANT_STOP, + EVENT_STATE_CHANGED, + STATE_UNAVAILABLE, + STATE_UNKNOWN, ) +from homeassistant.helpers import event as event_helper, state as state_helper import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.entityfilter import ( + INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA, + convert_include_exclude_filter, +) _LOGGER = logging.getLogger(__name__) +CONF_PUBLISH_STATES_HOST = "publish_states_host" + DEFAULT_SSL = False DEFAULT_PATH = "zabbix" DOMAIN = "zabbix" +TIMEOUT = 5 +RETRY_DELAY = 20 +QUEUE_BACKLOG_SECONDS = 30 +RETRY_INTERVAL = 60 # seconds +RETRY_MESSAGE = f"%s Retrying in {RETRY_INTERVAL} seconds." + +BATCH_TIMEOUT = 1 +BATCH_BUFFER_SIZE = 100 + CONFIG_SCHEMA = vol.Schema( { - DOMAIN: vol.Schema( + DOMAIN: INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA.extend( { vol.Required(CONF_HOST): cv.string, vol.Optional(CONF_PASSWORD): cv.string, vol.Optional(CONF_PATH, default=DEFAULT_PATH): cv.string, vol.Optional(CONF_SSL, default=DEFAULT_SSL): cv.boolean, vol.Optional(CONF_USERNAME): cv.string, + vol.Optional(CONF_PUBLISH_STATES_HOST): cv.string, } ) }, @@ -46,13 +73,188 @@ def setup(hass, config): username = conf.get(CONF_USERNAME) password = conf.get(CONF_PASSWORD) - zapi = ZabbixAPI(url) + publish_states_host = conf.get(CONF_PUBLISH_STATES_HOST) + + entities_filter = convert_include_exclude_filter(conf) + try: - zapi.login(username, password) + zapi = ZabbixAPI(url=url, user=username, password=password) _LOGGER.info("Connected to Zabbix API Version %s", zapi.api_version()) except ZabbixAPIException as login_exception: _LOGGER.error("Unable to login to the Zabbix API: %s", login_exception) return False + except HTTPError as http_error: + _LOGGER.error("HTTPError when connecting to Zabbix API: %s", http_error) + zapi = None + _LOGGER.error(RETRY_MESSAGE, http_error) + event_helper.call_later(hass, RETRY_INTERVAL, lambda _: setup(hass, config)) + return True hass.data[DOMAIN] = zapi + + def event_to_metrics(event, float_keys, string_keys): + """Add an event to the outgoing Zabbix list.""" + state = event.data.get("new_state") + if state is None or state.state in (STATE_UNKNOWN, "", STATE_UNAVAILABLE): + return + + entity_id = state.entity_id + if not entities_filter(entity_id): + return + + floats = {} + strings = {} + try: + _state_as_value = float(state.state) + floats[entity_id] = _state_as_value + except ValueError: + try: + _state_as_value = float(state_helper.state_as_number(state)) + floats[entity_id] = _state_as_value + except ValueError: + strings[entity_id] = state.state + + for key, value in state.attributes.items(): + # For each value we try to cast it as float + # But if we can not do it we store the value + # as string + attribute_id = f"{entity_id}/{key}" + try: + float_value = float(value) + except (ValueError, TypeError): + float_value = None + if float_value is None or not math.isfinite(float_value): + strings[attribute_id] = str(value) + else: + floats[attribute_id] = float_value + + metrics = [] + float_keys_count = len(float_keys) + float_keys.update(floats) + if len(float_keys) != float_keys_count: + floats_discovery = [] + for float_key in float_keys: + floats_discovery.append({"{#KEY}": float_key}) + metric = ZabbixMetric( + publish_states_host, + "homeassistant.floats_discovery", + json.dumps(floats_discovery), + ) + metrics.append(metric) + for key, value in floats.items(): + metric = ZabbixMetric( + publish_states_host, f"homeassistant.float[{key}]", value + ) + metrics.append(metric) + + string_keys.update(strings) + return metrics + + if publish_states_host: + zabbix_sender = ZabbixSender(zabbix_server=conf[CONF_HOST]) + instance = ZabbixThread(hass, zabbix_sender, event_to_metrics) + instance.setup(hass) + return True + + +class ZabbixThread(threading.Thread): + """A threaded event handler class.""" + + MAX_TRIES = 3 + + def __init__(self, hass, zabbix_sender, event_to_metrics): + """Initialize the listener.""" + threading.Thread.__init__(self, name="Zabbix") + self.queue = queue.Queue() + self.zabbix_sender = zabbix_sender + self.event_to_metrics = event_to_metrics + self.write_errors = 0 + self.shutdown = False + self.float_keys = set() + self.string_keys = set() + + def setup(self, hass): + """Set up the thread and start it.""" + hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener) + hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self._shutdown) + self.start() + _LOGGER.debug("Started publishing state changes to Zabbix") + + def _shutdown(self, event): + """Shut down the thread.""" + self.queue.put(None) + self.join() + + def _event_listener(self, event): + """Listen for new messages on the bus and queue them for Zabbix.""" + item = (time.monotonic(), event) + self.queue.put(item) + + def get_metrics(self): + """Return a batch of events formatted for writing.""" + queue_seconds = QUEUE_BACKLOG_SECONDS + self.MAX_TRIES * RETRY_DELAY + + count = 0 + metrics = [] + + dropped = 0 + + try: + while len(metrics) < BATCH_BUFFER_SIZE and not self.shutdown: + timeout = None if count == 0 else BATCH_TIMEOUT + item = self.queue.get(timeout=timeout) + count += 1 + + if item is None: + self.shutdown = True + else: + timestamp, event = item + age = time.monotonic() - timestamp + + if age < queue_seconds: + event_metrics = self.event_to_metrics( + event, self.float_keys, self.string_keys + ) + if event_metrics: + metrics += event_metrics + else: + dropped += 1 + + except queue.Empty: + pass + + if dropped: + _LOGGER.warning("Catching up, dropped %d old events", dropped) + + return count, metrics + + def write_to_zabbix(self, metrics): + """Write preprocessed events to zabbix, with retry.""" + + for retry in range(self.MAX_TRIES + 1): + try: + self.zabbix_sender.send(metrics) + + if self.write_errors: + _LOGGER.error("Resumed, lost %d events", self.write_errors) + self.write_errors = 0 + + _LOGGER.debug("Wrote %d metrics", len(metrics)) + break + except OSError as err: + if retry < self.MAX_TRIES: + time.sleep(RETRY_DELAY) + else: + if not self.write_errors: + _LOGGER.error("Write error: %s", err) + self.write_errors += len(metrics) + + def run(self): + """Process incoming events.""" + while not self.shutdown: + count, metrics = self.get_metrics() + if metrics: + self.write_to_zabbix(metrics) + for _ in range(count): + self.queue.task_done() diff --git a/homeassistant/components/zabbix/manifest.json b/homeassistant/components/zabbix/manifest.json index 08dfb98d5fa..5ed2e7c163d 100644 --- a/homeassistant/components/zabbix/manifest.json +++ b/homeassistant/components/zabbix/manifest.json @@ -2,6 +2,6 @@ "domain": "zabbix", "name": "Zabbix", "documentation": "https://www.home-assistant.io/integrations/zabbix", - "requirements": ["pyzabbix==0.7.4"], + "requirements": ["py-zabbix==1.1.7"], "codeowners": [] } diff --git a/requirements_all.txt b/requirements_all.txt index 1a86a0a5be2..0aa29d30307 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -1199,6 +1199,9 @@ py-schluter==0.1.7 # homeassistant.components.synology py-synology==0.2.0 +# homeassistant.components.zabbix +py-zabbix==1.1.7 + # homeassistant.components.seventeentrack py17track==2.2.2 @@ -1877,9 +1880,6 @@ pywilight==0.0.65 # homeassistant.components.xeoma pyxeoma==1.4.1 -# homeassistant.components.zabbix -pyzabbix==0.7.4 - # homeassistant.components.qrcode pyzbar==0.1.7