From a278cf3db240ebf17a7f04c6794c937481d14b65 Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Mon, 16 Mar 2020 23:58:50 -0600 Subject: [PATCH] Improve IQVIA data/API management based on enabled entities (#32291) * Improve IQVIA data/API management based on enabled entities * Code review comments * Code review * Cleanup * Linting * Code review * Code review --- homeassistant/components/iqvia/__init__.py | 172 +++++++++++++++------ homeassistant/components/iqvia/const.py | 2 +- homeassistant/components/iqvia/sensor.py | 36 ++--- 3 files changed, 140 insertions(+), 70 deletions(-) diff --git a/homeassistant/components/iqvia/__init__.py b/homeassistant/components/iqvia/__init__.py index a33dabeadeb..1f487dd345c 100644 --- a/homeassistant/components/iqvia/__init__.py +++ b/homeassistant/components/iqvia/__init__.py @@ -4,7 +4,7 @@ from datetime import timedelta import logging from pyiqvia import Client -from pyiqvia.errors import InvalidZipError +from pyiqvia.errors import InvalidZipError, IQVIAError import voluptuous as vol from homeassistant.config_entries import SOURCE_IMPORT @@ -17,7 +17,6 @@ from homeassistant.helpers.dispatcher import ( ) from homeassistant.helpers.entity import Entity from homeassistant.helpers.event import async_track_time_interval -from homeassistant.util.decorator import Registry from .config_flow import configured_instances from .const import ( @@ -43,20 +42,20 @@ from .const import ( _LOGGER = logging.getLogger(__name__) +API_CATEGORY_MAPPING = { + TYPE_ALLERGY_TODAY: TYPE_ALLERGY_INDEX, + TYPE_ALLERGY_TOMORROW: TYPE_ALLERGY_INDEX, + TYPE_ALLERGY_TOMORROW: TYPE_ALLERGY_INDEX, + TYPE_ASTHMA_TODAY: TYPE_ASTHMA_INDEX, + TYPE_ASTHMA_TOMORROW: TYPE_ALLERGY_INDEX, + TYPE_DISEASE_TODAY: TYPE_DISEASE_INDEX, +} + DATA_CONFIG = "config" DEFAULT_ATTRIBUTION = "Data provided by IQVIA™" DEFAULT_SCAN_INTERVAL = timedelta(minutes=30) -FETCHER_MAPPING = { - (TYPE_ALLERGY_FORECAST,): (TYPE_ALLERGY_FORECAST, TYPE_ALLERGY_OUTLOOK), - (TYPE_ALLERGY_TODAY, TYPE_ALLERGY_TOMORROW): (TYPE_ALLERGY_INDEX,), - (TYPE_ASTHMA_FORECAST,): (TYPE_ASTHMA_FORECAST,), - (TYPE_ASTHMA_TODAY, TYPE_ASTHMA_TOMORROW): (TYPE_ASTHMA_INDEX,), - (TYPE_DISEASE_FORECAST,): (TYPE_DISEASE_FORECAST,), - (TYPE_DISEASE_TODAY,): (TYPE_DISEASE_INDEX,), -} - CONFIG_SCHEMA = vol.Schema( { DOMAIN: vol.All( @@ -75,6 +74,12 @@ CONFIG_SCHEMA = vol.Schema( ) +@callback +def async_get_api_category(sensor_type): + """Return the API category that a particular sensor type should use.""" + return API_CATEGORY_MAPPING.get(sensor_type, sensor_type) + + async def async_setup(hass, config): """Set up the IQVIA component.""" hass.data[DOMAIN] = {} @@ -102,8 +107,9 @@ async def async_setup_entry(hass, config_entry): """Set up IQVIA as config entry.""" websession = aiohttp_client.async_get_clientsession(hass) + iqvia = IQVIAData(hass, Client(config_entry.data[CONF_ZIP_CODE], websession)) + try: - iqvia = IQVIAData(Client(config_entry.data[CONF_ZIP_CODE], websession)) await iqvia.async_update() except InvalidZipError: _LOGGER.error("Invalid ZIP code provided: %s", config_entry.data[CONF_ZIP_CODE]) @@ -115,16 +121,6 @@ async def async_setup_entry(hass, config_entry): hass.config_entries.async_forward_entry_setup(config_entry, "sensor") ) - async def refresh(event_time): - """Refresh IQVIA data.""" - _LOGGER.debug("Updating IQVIA data") - await iqvia.async_update() - async_dispatcher_send(hass, TOPIC_DATA_UPDATE) - - hass.data[DOMAIN][DATA_LISTENER][config_entry.entry_id] = async_track_time_interval( - hass, refresh, DEFAULT_SCAN_INTERVAL - ) - return True @@ -143,42 +139,99 @@ async def async_unload_entry(hass, config_entry): class IQVIAData: """Define a data object to retrieve info from IQVIA.""" - def __init__(self, client): + def __init__(self, hass, client): """Initialize.""" + self._async_cancel_time_interval_listener = None self._client = client + self._hass = hass self.data = {} self.zip_code = client.zip_code - self.fetchers = Registry() - self.fetchers.register(TYPE_ALLERGY_FORECAST)(self._client.allergens.extended) - self.fetchers.register(TYPE_ALLERGY_OUTLOOK)(self._client.allergens.outlook) - self.fetchers.register(TYPE_ALLERGY_INDEX)(self._client.allergens.current) - self.fetchers.register(TYPE_ASTHMA_FORECAST)(self._client.asthma.extended) - self.fetchers.register(TYPE_ASTHMA_INDEX)(self._client.asthma.current) - self.fetchers.register(TYPE_DISEASE_FORECAST)(self._client.disease.extended) - self.fetchers.register(TYPE_DISEASE_INDEX)(self._client.disease.current) + self._api_coros = { + TYPE_ALLERGY_FORECAST: client.allergens.extended, + TYPE_ALLERGY_INDEX: client.allergens.current, + TYPE_ALLERGY_OUTLOOK: client.allergens.outlook, + TYPE_ASTHMA_FORECAST: client.asthma.extended, + TYPE_ASTHMA_INDEX: client.asthma.current, + TYPE_DISEASE_FORECAST: client.disease.extended, + TYPE_DISEASE_INDEX: client.disease.current, + } + self._api_category_count = { + TYPE_ALLERGY_FORECAST: 0, + TYPE_ALLERGY_INDEX: 0, + TYPE_ALLERGY_OUTLOOK: 0, + TYPE_ASTHMA_FORECAST: 0, + TYPE_ASTHMA_INDEX: 0, + TYPE_DISEASE_FORECAST: 0, + TYPE_DISEASE_INDEX: 0, + } + self._api_category_locks = { + TYPE_ALLERGY_FORECAST: asyncio.Lock(), + TYPE_ALLERGY_INDEX: asyncio.Lock(), + TYPE_ALLERGY_OUTLOOK: asyncio.Lock(), + TYPE_ASTHMA_FORECAST: asyncio.Lock(), + TYPE_ASTHMA_INDEX: asyncio.Lock(), + TYPE_DISEASE_FORECAST: asyncio.Lock(), + TYPE_DISEASE_INDEX: asyncio.Lock(), + } + + async def _async_get_data_from_api(self, api_category): + """Update and save data for a particular API category.""" + if self._api_category_count[api_category] == 0: + return + + try: + self.data[api_category] = await self._api_coros[api_category]() + except IQVIAError as err: + _LOGGER.error("Unable to get %s data: %s", api_category, err) + self.data[api_category] = None + + async def _async_update_listener_action(self, now): + """Define an async_track_time_interval action to update data.""" + await self.async_update() + + @callback + def async_deregister_api_interest(self, sensor_type): + """Decrement the number of entities with data needs from an API category.""" + # If this deregistration should leave us with no registration at all, remove the + # time interval: + if sum(self._api_category_count.values()) == 0: + if self._async_cancel_time_interval_listener: + self._async_cancel_time_interval_listener() + self._async_cancel_time_interval_listener = None + return + + api_category = async_get_api_category(sensor_type) + self._api_category_count[api_category] -= 1 + + async def async_register_api_interest(self, sensor_type): + """Increment the number of entities with data needs from an API category.""" + # If this is the first registration we have, start a time interval: + if not self._async_cancel_time_interval_listener: + self._async_cancel_time_interval_listener = async_track_time_interval( + self._hass, self._async_update_listener_action, DEFAULT_SCAN_INTERVAL, + ) + + api_category = async_get_api_category(sensor_type) + self._api_category_count[api_category] += 1 + + # If a sensor registers interest in a particular API call and the data doesn't + # exist for it yet, make the API call and grab the data: + async with self._api_category_locks[api_category]: + if api_category not in self.data: + await self._async_get_data_from_api(api_category) async def async_update(self): """Update IQVIA data.""" - tasks = {} + tasks = [ + self._async_get_data_from_api(api_category) + for api_category in self._api_coros + ] - for conditions, fetcher_types in FETCHER_MAPPING.items(): - if not any(c in SENSORS for c in conditions): - continue + await asyncio.gather(*tasks) - for fetcher_type in fetcher_types: - tasks[fetcher_type] = self.fetchers[fetcher_type]() - - results = await asyncio.gather(*tasks.values(), return_exceptions=True) - - for key, result in zip(tasks, results): - if isinstance(result, Exception): - _LOGGER.error("Unable to get %s data: %s", key, result) - self.data[key] = {} - continue - - _LOGGER.debug("Loaded new %s data", key) - self.data[key] = result + _LOGGER.debug("Received new data") + async_dispatcher_send(self._hass, TOPIC_DATA_UPDATE) class IQVIAEntity(Entity): @@ -245,13 +298,34 @@ class IQVIAEntity(Entity): @callback def update(): """Update the state.""" - self.async_schedule_update_ha_state(True) + self.update_from_latest_data() + self.async_write_ha_state() self._async_unsub_dispatcher_connect = async_dispatcher_connect( self.hass, TOPIC_DATA_UPDATE, update ) + await self._iqvia.async_register_api_interest(self._type) + if self._type == TYPE_ALLERGY_FORECAST: + # Entities that express interest in allergy forecast data should also + # express interest in allergy outlook data: + await self._iqvia.async_register_api_interest(TYPE_ALLERGY_OUTLOOK) + + self.update_from_latest_data() + async def async_will_remove_from_hass(self): """Disconnect dispatcher listener when removed.""" if self._async_unsub_dispatcher_connect: self._async_unsub_dispatcher_connect() + self._async_unsub_dispatcher_connect = None + + self._iqvia.async_deregister_api_interest(self._type) + if self._type == TYPE_ALLERGY_FORECAST: + # Entities that lose interest in allergy forecast data should also lose + # interest in allergy outlook data: + self._iqvia.async_deregister_api_interest(TYPE_ALLERGY_OUTLOOK) + + @callback + def update_from_latest_data(self): + """Update the entity's state.""" + raise NotImplementedError() diff --git a/homeassistant/components/iqvia/const.py b/homeassistant/components/iqvia/const.py index 52e657bc2c0..95b03485597 100644 --- a/homeassistant/components/iqvia/const.py +++ b/homeassistant/components/iqvia/const.py @@ -25,9 +25,9 @@ SENSORS = { TYPE_ALLERGY_FORECAST: ("Allergy Index: Forecasted Average", "mdi:flower"), TYPE_ALLERGY_TODAY: ("Allergy Index: Today", "mdi:flower"), TYPE_ALLERGY_TOMORROW: ("Allergy Index: Tomorrow", "mdi:flower"), + TYPE_ASTHMA_FORECAST: ("Asthma Index: Forecasted Average", "mdi:flower"), TYPE_ASTHMA_TODAY: ("Asthma Index: Today", "mdi:flower"), TYPE_ASTHMA_TOMORROW: ("Asthma Index: Tomorrow", "mdi:flower"), - TYPE_ASTHMA_FORECAST: ("Asthma Index: Forecasted Average", "mdi:flower"), TYPE_DISEASE_FORECAST: ("Cold & Flu: Forecasted Average", "mdi:snowflake"), TYPE_DISEASE_TODAY: ("Cold & Flu Index: Today", "mdi:pill"), } diff --git a/homeassistant/components/iqvia/sensor.py b/homeassistant/components/iqvia/sensor.py index 1aae63a4908..5db4456b3c6 100644 --- a/homeassistant/components/iqvia/sensor.py +++ b/homeassistant/components/iqvia/sensor.py @@ -7,7 +7,6 @@ import numpy as np from homeassistant.components.iqvia import ( DATA_CLIENT, DOMAIN, - SENSORS, TYPE_ALLERGY_FORECAST, TYPE_ALLERGY_INDEX, TYPE_ALLERGY_OUTLOOK, @@ -23,6 +22,9 @@ from homeassistant.components.iqvia import ( IQVIAEntity, ) from homeassistant.const import ATTR_STATE +from homeassistant.core import callback + +from .const import SENSORS _LOGGER = logging.getLogger(__name__) @@ -65,13 +67,14 @@ async def async_setup_entry(hass, entry, async_add_entities): TYPE_DISEASE_TODAY: IndexSensor, } - sensors = [] - for sensor_type in SENSORS: - klass = sensor_class_mapping[sensor_type] - name, icon = SENSORS[sensor_type] - sensors.append(klass(iqvia, sensor_type, name, icon, iqvia.zip_code)) - - async_add_entities(sensors, True) + async_add_entities( + [ + sensor_class_mapping[sensor_type]( + iqvia, sensor_type, name, icon, iqvia.zip_code + ) + for sensor_type, (name, icon) in SENSORS.items() + ] + ) def calculate_trend(indices): @@ -93,9 +96,10 @@ def calculate_trend(indices): class ForecastSensor(IQVIAEntity): """Define sensor related to forecast data.""" - async def async_update(self): + @callback + def update_from_latest_data(self): """Update the sensor.""" - if not self._iqvia.data: + if not self._iqvia.data.get(self._type): return data = self._iqvia.data[self._type].get("Location") @@ -131,12 +135,10 @@ class ForecastSensor(IQVIAEntity): class IndexSensor(IQVIAEntity): """Define sensor related to indices.""" - async def async_update(self): + @callback + def update_from_latest_data(self): """Update the sensor.""" if not self._iqvia.data: - _LOGGER.warning( - "IQVIA didn't return data for %s; trying again later", self.name - ) return try: @@ -147,9 +149,6 @@ class IndexSensor(IQVIAEntity): elif self._type == TYPE_DISEASE_TODAY: data = self._iqvia.data[TYPE_DISEASE_INDEX].get("Location") except KeyError: - _LOGGER.warning( - "IQVIA didn't return data for %s; trying again later", self.name - ) return key = self._type.split("_")[-1].title() @@ -157,9 +156,6 @@ class IndexSensor(IQVIAEntity): try: [period] = [p for p in data["periods"] if p["Type"] == key] except ValueError: - _LOGGER.warning( - "IQVIA didn't return data for %s; trying again later", self.name - ) return [rating] = [