diff --git a/homeassistant/components/azure_event_hub/__init__.py b/homeassistant/components/azure_event_hub/__init__.py index cc59790b646..3b44c6423be 100644 --- a/homeassistant/components/azure_event_hub/__init__.py +++ b/homeassistant/components/azure_event_hub/__init__.py @@ -1,89 +1,223 @@ """Support for Azure Event Hubs.""" +import asyncio import json import logging +import time from typing import Any, Dict -from azure.eventhub import EventData, EventHubClientAsync +from azure.eventhub import EventData +from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential +from azure.eventhub.exceptions import EventHubError import voluptuous as vol from homeassistant.const import ( EVENT_HOMEASSISTANT_STOP, - EVENT_STATE_CHANGED, + MATCH_ALL, STATE_UNAVAILABLE, STATE_UNKNOWN, ) from homeassistant.core import Event, HomeAssistant import homeassistant.helpers.config_validation as cv from homeassistant.helpers.entityfilter import FILTER_SCHEMA +from homeassistant.helpers.event import async_call_later from homeassistant.helpers.json import JSONEncoder +from .const import ( + ADDITIONAL_ARGS, + CONF_EVENT_HUB_CON_STRING, + CONF_EVENT_HUB_INSTANCE_NAME, + CONF_EVENT_HUB_NAMESPACE, + CONF_EVENT_HUB_SAS_KEY, + CONF_EVENT_HUB_SAS_POLICY, + CONF_FILTER, + CONF_MAX_DELAY, + CONF_SEND_INTERVAL, + DOMAIN, +) + _LOGGER = logging.getLogger(__name__) -DOMAIN = "azure_event_hub" - -CONF_EVENT_HUB_NAMESPACE = "event_hub_namespace" -CONF_EVENT_HUB_INSTANCE_NAME = "event_hub_instance_name" -CONF_EVENT_HUB_SAS_POLICY = "event_hub_sas_policy" -CONF_EVENT_HUB_SAS_KEY = "event_hub_sas_key" -CONF_FILTER = "filter" - CONFIG_SCHEMA = vol.Schema( { DOMAIN: vol.Schema( { - vol.Required(CONF_EVENT_HUB_NAMESPACE): cv.string, - vol.Required(CONF_EVENT_HUB_INSTANCE_NAME): cv.string, - vol.Required(CONF_EVENT_HUB_SAS_POLICY): cv.string, - vol.Required(CONF_EVENT_HUB_SAS_KEY): cv.string, - vol.Required(CONF_FILTER): FILTER_SCHEMA, - } + vol.Exclusive(CONF_EVENT_HUB_CON_STRING, "setup_methods"): cv.string, + vol.Exclusive(CONF_EVENT_HUB_NAMESPACE, "setup_methods"): cv.string, + vol.Optional(CONF_EVENT_HUB_INSTANCE_NAME): cv.string, + vol.Optional(CONF_EVENT_HUB_SAS_POLICY): cv.string, + vol.Optional(CONF_EVENT_HUB_SAS_KEY): cv.string, + vol.Optional(CONF_SEND_INTERVAL, default=5): cv.positive_int, + vol.Optional(CONF_MAX_DELAY, default=30): cv.positive_int, + vol.Optional(CONF_FILTER, default={}): FILTER_SCHEMA, + }, + cv.has_at_least_one_key( + CONF_EVENT_HUB_CON_STRING, CONF_EVENT_HUB_NAMESPACE + ), ) }, extra=vol.ALLOW_EXTRA, ) -async def async_setup(hass: HomeAssistant, yaml_config: Dict[str, Any]): +async def async_setup(hass, yaml_config): """Activate Azure EH component.""" config = yaml_config[DOMAIN] + if config.get(CONF_EVENT_HUB_CON_STRING): + client_args = {"conn_str": config[CONF_EVENT_HUB_CON_STRING]} + conn_str_client = True + else: + client_args = { + "fully_qualified_namespace": f"{config[CONF_EVENT_HUB_NAMESPACE]}.servicebus.windows.net", + "credential": EventHubSharedKeyCredential( + policy=config[CONF_EVENT_HUB_SAS_POLICY], + key=config[CONF_EVENT_HUB_SAS_KEY], + ), + "eventhub_name": config[CONF_EVENT_HUB_INSTANCE_NAME], + } + conn_str_client = False - event_hub_address = ( - f"amqps://{config[CONF_EVENT_HUB_NAMESPACE]}" - f".servicebus.windows.net/{config[CONF_EVENT_HUB_INSTANCE_NAME]}" + instance = hass.data[DOMAIN] = AzureEventHub( + hass, + client_args, + conn_str_client, + config[CONF_FILTER], + config[CONF_SEND_INTERVAL], + config[CONF_MAX_DELAY], ) - entities_filter = config[CONF_FILTER] - client = EventHubClientAsync( - event_hub_address, - debug=True, - username=config[CONF_EVENT_HUB_SAS_POLICY], - password=config[CONF_EVENT_HUB_SAS_KEY], - ) - async_sender = client.add_async_sender() - await client.run_async() + hass.async_create_task(instance.async_start()) + return True - encoder = JSONEncoder() - async def async_send_to_event_hub(event: Event): - """Send states to Event Hub.""" +class AzureEventHub: + """A event handler class for Azure Event Hub.""" + + def __init__( + self, + hass: HomeAssistant, + client_args: Dict[str, Any], + conn_str_client: bool, + entities_filter: vol.Schema, + send_interval: int, + max_delay: int, + ): + """Initialize the listener.""" + self.hass = hass + self.queue = asyncio.PriorityQueue() + self._client_args = client_args + self._conn_str_client = conn_str_client + self._entities_filter = entities_filter + self._send_interval = send_interval + self._max_delay = max_delay + send_interval + self._listener_remover = None + self._next_send_remover = None + self.shutdown = False + + async def async_start(self): + """Start the recorder, suppress logging and register the callbacks and do the first send after five seconds, to capture the startup events.""" + # suppress the INFO and below logging on the underlying packages, they are very verbose, even at INFO + logging.getLogger("uamqp").setLevel(logging.WARNING) + logging.getLogger("azure.eventhub").setLevel(logging.WARNING) + + self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_shutdown) + self._listener_remover = self.hass.bus.async_listen( + MATCH_ALL, self.async_listen + ) + # schedule the first send after 10 seconds to capture startup events, after that each send will schedule the next after the interval. + self._next_send_remover = async_call_later(self.hass, 10, self.async_send) + + async def async_shutdown(self, _: Event): + """Shut down the AEH by queueing None and calling send.""" + if self._next_send_remover: + self._next_send_remover() + if self._listener_remover: + self._listener_remover() + await self.queue.put((3, (time.monotonic(), None))) + await self.async_send(None) + + async def async_listen(self, event: Event): + """Listen for new messages on the bus and queue them for AEH.""" + await self.queue.put((2, (time.monotonic(), event))) + + async def async_send(self, _): + """Write preprocessed events to eventhub, with retry.""" + client = self._get_client() + async with client: + while not self.queue.empty(): + data_batch, dequeue_count = await self.fill_batch(client) + _LOGGER.debug( + "Sending %d event(s), out of %d events in the queue", + len(data_batch), + dequeue_count, + ) + if data_batch: + try: + await client.send_batch(data_batch) + except EventHubError as exc: + _LOGGER.error("Error in sending events to Event Hub: %s", exc) + finally: + for _ in range(dequeue_count): + self.queue.task_done() + await client.close() + + if not self.shutdown: + self._next_send_remover = async_call_later( + self.hass, self._send_interval, self.async_send + ) + + async def fill_batch(self, client): + """Return a batch of events formatted for writing. + + Uses get_nowait instead of await get, because the functions batches and doesn't wait for each single event, the send function is called. + + Throws ValueError on add to batch when the EventDataBatch object reaches max_size. Put the item back in the queue and the next batch will include it. + """ + event_batch = await client.create_batch() + dequeue_count = 0 + dropped = 0 + while not self.shutdown: + try: + _, (timestamp, event) = self.queue.get_nowait() + except asyncio.QueueEmpty: + break + dequeue_count += 1 + if not event: + self.shutdown = True + break + event_data = self._event_to_filtered_event_data(event) + if not event_data: + continue + if time.monotonic() - timestamp <= self._max_delay: + try: + event_batch.add(event_data) + except ValueError: + self.queue.put_nowait((1, (timestamp, event))) + break + else: + dropped += 1 + + if dropped: + _LOGGER.warning( + "Dropped %d old events, consider increasing the max_delay", dropped + ) + + return event_batch, dequeue_count + + def _event_to_filtered_event_data(self, event: Event): + """Filter event states and create EventData object.""" state = event.data.get("new_state") if ( state is None or state.state in (STATE_UNKNOWN, "", STATE_UNAVAILABLE) - or not entities_filter(state.entity_id) + or not self._entities_filter(state.entity_id) ): - return + return None + return EventData(json.dumps(obj=state, cls=JSONEncoder).encode("utf-8")) - event_data = EventData( - json.dumps(obj=state.as_dict(), default=encoder.encode).encode("utf-8") - ) - await async_sender.send(event_data) - - async def async_shutdown(event: Event): - """Shut down the client.""" - await client.stop_async() - - hass.bus.async_listen(EVENT_STATE_CHANGED, async_send_to_event_hub) - hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, async_shutdown) - - return True + def _get_client(self): + """Get a Event Producer Client.""" + if self._conn_str_client: + return EventHubProducerClient.from_connection_string( + **self._client_args, **ADDITIONAL_ARGS + ) + return EventHubProducerClient(**self._client_args, **ADDITIONAL_ARGS) diff --git a/homeassistant/components/azure_event_hub/const.py b/homeassistant/components/azure_event_hub/const.py new file mode 100644 index 00000000000..1786bb5cbf2 --- /dev/null +++ b/homeassistant/components/azure_event_hub/const.py @@ -0,0 +1,13 @@ +"""Constants and shared schema for the Azure Event Hub integration.""" +DOMAIN = "azure_event_hub" + +CONF_EVENT_HUB_NAMESPACE = "event_hub_namespace" +CONF_EVENT_HUB_INSTANCE_NAME = "event_hub_instance_name" +CONF_EVENT_HUB_SAS_POLICY = "event_hub_sas_policy" +CONF_EVENT_HUB_SAS_KEY = "event_hub_sas_key" +CONF_EVENT_HUB_CON_STRING = "event_hub_connection_string" +CONF_SEND_INTERVAL = "send_interval" +CONF_MAX_DELAY = "max_delay" +CONF_FILTER = "filter" + +ADDITIONAL_ARGS = {"logging_enable": False} diff --git a/homeassistant/components/azure_event_hub/manifest.json b/homeassistant/components/azure_event_hub/manifest.json index f9d4cf09e04..08bae34d731 100644 --- a/homeassistant/components/azure_event_hub/manifest.json +++ b/homeassistant/components/azure_event_hub/manifest.json @@ -2,6 +2,6 @@ "domain": "azure_event_hub", "name": "Azure Event Hub", "documentation": "https://www.home-assistant.io/integrations/azure_event_hub", - "requirements": ["azure-eventhub==1.3.1"], + "requirements": ["azure-eventhub==5.1.0"], "codeowners": ["@eavanvalkenburg"] } diff --git a/requirements_all.txt b/requirements_all.txt index 6471d73325e..a785ea2e137 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -309,7 +309,7 @@ avri-api==0.1.7 axis==29 # homeassistant.components.azure_event_hub -azure-eventhub==1.3.1 +azure-eventhub==5.1.0 # homeassistant.components.azure_service_bus azure-servicebus==0.50.1