From 69cc6affd5d3a331bf4376a018b6f666d0868ddc Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Fri, 12 Jul 2019 05:13:51 -0600 Subject: [PATCH] Add support for recording history to Apache Kafka (#25085) * Add support for Apache Kafka * Simplified * Revert "Simplified" This reverts commit fde4624e07a44edcaa728666777e7cfd789d68f6. * Revert "Revert "Simplified"" This reverts commit 5ae57e64c29f9e1cdbba6545420f4c2003d664e2. * Completed * Updated requirements * Updated .coveragerc * Removed unused import * Updated codeowner --- .coveragerc | 1 + CODEOWNERS | 1 + .../components/apache_kafka/__init__.py | 111 ++++++++++++++++++ .../components/apache_kafka/manifest.json | 12 ++ requirements_all.txt | 3 + 5 files changed, 128 insertions(+) create mode 100644 homeassistant/components/apache_kafka/__init__.py create mode 100644 homeassistant/components/apache_kafka/manifest.json diff --git a/.coveragerc b/.coveragerc index 781b5d17279..ff02e55d1f9 100644 --- a/.coveragerc +++ b/.coveragerc @@ -34,6 +34,7 @@ omit = homeassistant/components/androidtv/* homeassistant/components/anel_pwrctrl/switch.py homeassistant/components/anthemav/media_player.py + homeassistant/components/apache_kafka/* homeassistant/components/apcupsd/* homeassistant/components/apple_tv/* homeassistant/components/aqualogic/* diff --git a/CODEOWNERS b/CODEOWNERS index 8117968cf11..66d2b4ef8f5 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -24,6 +24,7 @@ homeassistant/components/alpha_vantage/* @fabaff homeassistant/components/amazon_polly/* @robbiet480 homeassistant/components/ambiclimate/* @danielhiversen homeassistant/components/ambient_station/* @bachya +homeassistant/components/apache_kafka/* @bachya homeassistant/components/api/* @home-assistant/core homeassistant/components/aprs/* @PhilRW homeassistant/components/arcam_fmj/* @elupus diff --git a/homeassistant/components/apache_kafka/__init__.py b/homeassistant/components/apache_kafka/__init__.py new file mode 100644 index 00000000000..e8617eaf317 --- /dev/null +++ b/homeassistant/components/apache_kafka/__init__.py @@ -0,0 +1,111 @@ +"""Support for Apache Kafka.""" +from datetime import datetime +import json +import logging + +from aiokafka import AIOKafkaProducer +import voluptuous as vol + +from homeassistant.const import ( + CONF_IP_ADDRESS, CONF_PORT, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, + STATE_UNAVAILABLE, STATE_UNKNOWN) +import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.entityfilter import FILTER_SCHEMA + +_LOGGER = logging.getLogger(__name__) + +DOMAIN = 'apache_kafka' + +CONF_FILTER = 'filter' +CONF_TOPIC = 'topic' + +CONFIG_SCHEMA = vol.Schema({ + DOMAIN: vol.Schema({ + vol.Required(CONF_IP_ADDRESS): cv.string, + vol.Required(CONF_PORT): cv.port, + vol.Required(CONF_TOPIC): cv.string, + vol.Optional(CONF_FILTER, default={}): FILTER_SCHEMA, + }), +}, extra=vol.ALLOW_EXTRA) + + +async def async_setup(hass, config): + """Activate the Apache Kafka integration.""" + conf = config[DOMAIN] + + kafka = hass.data[DOMAIN] = KafkaManager( + hass, + conf[CONF_IP_ADDRESS], + conf[CONF_PORT], + conf[CONF_TOPIC], + conf[CONF_FILTER]) + + hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, kafka.shutdown()) + + await kafka.start() + + return True + + +class DateTimeJSONEncoder(json.JSONEncoder): + """Encode python objects. + + Additionally add encoding for datetime objects as isoformat. + """ + + def default(self, o): # pylint: disable=E0202 + """Implement encoding logic.""" + if isinstance(o, datetime): + return o.isoformat() + return super().default(o) + + +class KafkaManager: + """Define a manager to buffer events to Kafka.""" + + def __init__( + self, + hass, + ip_address, + port, + topic, + entities_filter): + """Initialize.""" + self._encoder = DateTimeJSONEncoder() + self._entities_filter = entities_filter + self._hass = hass + self._producer = AIOKafkaProducer( + loop=hass.loop, + bootstrap_servers="{0}:{1}".format(ip_address, port), + compression_type="gzip", + ) + self._topic = topic + + def _encode_event(self, event): + """Translate events into a binary JSON payload.""" + state = event.data.get('new_state') + if (state is None + or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE) + or not self._entities_filter(state.entity_id)): + return + + return json.dumps( + obj=state.as_dict(), + default=self._encoder.encode + ).encode('utf-8') + + async def start(self): + """Start the Kafka manager.""" + self._hass.bus.async_listen(EVENT_STATE_CHANGED, self.write) + await self._producer.start() + + async def shutdown(self): + """Shut the manager down.""" + await self._producer.stop() + + async def write(self, event): + """Write a binary payload to Kafka.""" + payload = self._encode_event(event) + + if payload: + await self._producer.send_and_wait(self._topic, payload) diff --git a/homeassistant/components/apache_kafka/manifest.json b/homeassistant/components/apache_kafka/manifest.json new file mode 100644 index 00000000000..ac36af7fa48 --- /dev/null +++ b/homeassistant/components/apache_kafka/manifest.json @@ -0,0 +1,12 @@ +{ + "domain": "apache_kafka", + "name": "Apache Kafka", + "documentation": "https://www.home-assistant.io/components/apache_kafka", + "requirements": [ + "aiokafka==0.5.1" + ], + "dependencies": [], + "codeowners": [ + "@bachya" + ] +} diff --git a/requirements_all.txt b/requirements_all.txt index b0a034b36a1..fcfcca0c574 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -150,6 +150,9 @@ aiohue==1.9.1 # homeassistant.components.imap aioimaplib==0.7.15 +# homeassistant.components.apache_kafka +aiokafka==0.5.1 + # homeassistant.components.lifx aiolifx==0.6.7