Move airnow coordinator to its own file (#99423)

This commit is contained in:
Jan-Philipp Benecke 2023-09-01 08:29:07 +02:00 committed by GitHub
parent dc4ed5fea9
commit bbc390837e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 102 additions and 93 deletions

View File

@ -34,6 +34,7 @@ omit =
homeassistant/components/agent_dvr/camera.py
homeassistant/components/agent_dvr/helpers.py
homeassistant/components/airnow/__init__.py
homeassistant/components/airnow/coordinator.py
homeassistant/components/airnow/sensor.py
homeassistant/components/airq/__init__.py
homeassistant/components/airq/coordinator.py

View File

@ -2,11 +2,6 @@
import datetime
import logging
from aiohttp.client_exceptions import ClientConnectorError
from pyairnow import WebServiceAPI
from pyairnow.conv import aqi_to_concentration
from pyairnow.errors import AirNowError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_API_KEY,
@ -17,26 +12,9 @@ from homeassistant.const import (
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
ATTR_API_AQI,
ATTR_API_AQI_DESCRIPTION,
ATTR_API_AQI_LEVEL,
ATTR_API_AQI_PARAM,
ATTR_API_CAT_DESCRIPTION,
ATTR_API_CAT_LEVEL,
ATTR_API_CATEGORY,
ATTR_API_PM25,
ATTR_API_POLLUTANT,
ATTR_API_REPORT_DATE,
ATTR_API_REPORT_HOUR,
ATTR_API_STATE,
ATTR_API_STATION,
ATTR_API_STATION_LATITUDE,
ATTR_API_STATION_LONGITUDE,
DOMAIN,
)
from .const import DOMAIN
from .coordinator import AirNowDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
PLATFORMS = [Platform.SENSOR]
@ -107,72 +85,3 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
await hass.config_entries.async_reload(entry.entry_id)
class AirNowDataUpdateCoordinator(DataUpdateCoordinator):
"""Define an object to hold Airly data."""
def __init__(
self, hass, session, api_key, latitude, longitude, distance, update_interval
):
"""Initialize."""
self.latitude = latitude
self.longitude = longitude
self.distance = distance
self.airnow = WebServiceAPI(api_key, session=session)
super().__init__(hass, _LOGGER, name=DOMAIN, update_interval=update_interval)
async def _async_update_data(self):
"""Update data via library."""
data = {}
try:
obs = await self.airnow.observations.latLong(
self.latitude,
self.longitude,
distance=self.distance,
)
except (AirNowError, ClientConnectorError) as error:
raise UpdateFailed(error) from error
if not obs:
raise UpdateFailed("No data was returned from AirNow")
max_aqi = 0
max_aqi_level = 0
max_aqi_desc = ""
max_aqi_poll = ""
for obv in obs:
# Convert AQIs to Concentration
pollutant = obv[ATTR_API_AQI_PARAM]
concentration = aqi_to_concentration(obv[ATTR_API_AQI], pollutant)
data[obv[ATTR_API_AQI_PARAM]] = concentration
# Overall AQI is the max of all pollutant AQIs
if obv[ATTR_API_AQI] > max_aqi:
max_aqi = obv[ATTR_API_AQI]
max_aqi_level = obv[ATTR_API_CATEGORY][ATTR_API_CAT_LEVEL]
max_aqi_desc = obv[ATTR_API_CATEGORY][ATTR_API_CAT_DESCRIPTION]
max_aqi_poll = pollutant
# Copy other data from PM2.5 Value
if obv[ATTR_API_AQI_PARAM] == ATTR_API_PM25:
# Copy Report Details
data[ATTR_API_REPORT_DATE] = obv[ATTR_API_REPORT_DATE]
data[ATTR_API_REPORT_HOUR] = obv[ATTR_API_REPORT_HOUR]
# Copy Station Details
data[ATTR_API_STATE] = obv[ATTR_API_STATE]
data[ATTR_API_STATION] = obv[ATTR_API_STATION]
data[ATTR_API_STATION_LATITUDE] = obv[ATTR_API_STATION_LATITUDE]
data[ATTR_API_STATION_LONGITUDE] = obv[ATTR_API_STATION_LONGITUDE]
# Store Overall AQI
data[ATTR_API_AQI] = max_aqi
data[ATTR_API_AQI_LEVEL] = max_aqi_level
data[ATTR_API_AQI_DESCRIPTION] = max_aqi_desc
data[ATTR_API_POLLUTANT] = max_aqi_poll
return data

View File

@ -0,0 +1,99 @@
"""DataUpdateCoordinator for the AirNow integration."""
import logging
from aiohttp.client_exceptions import ClientConnectorError
from pyairnow import WebServiceAPI
from pyairnow.conv import aqi_to_concentration
from pyairnow.errors import AirNowError
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
ATTR_API_AQI,
ATTR_API_AQI_DESCRIPTION,
ATTR_API_AQI_LEVEL,
ATTR_API_AQI_PARAM,
ATTR_API_CAT_DESCRIPTION,
ATTR_API_CAT_LEVEL,
ATTR_API_CATEGORY,
ATTR_API_PM25,
ATTR_API_POLLUTANT,
ATTR_API_REPORT_DATE,
ATTR_API_REPORT_HOUR,
ATTR_API_STATE,
ATTR_API_STATION,
ATTR_API_STATION_LATITUDE,
ATTR_API_STATION_LONGITUDE,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
class AirNowDataUpdateCoordinator(DataUpdateCoordinator):
"""The AirNow update coordinator."""
def __init__(
self, hass, session, api_key, latitude, longitude, distance, update_interval
):
"""Initialize."""
self.latitude = latitude
self.longitude = longitude
self.distance = distance
self.airnow = WebServiceAPI(api_key, session=session)
super().__init__(hass, _LOGGER, name=DOMAIN, update_interval=update_interval)
async def _async_update_data(self):
"""Update data via library."""
data = {}
try:
obs = await self.airnow.observations.latLong(
self.latitude,
self.longitude,
distance=self.distance,
)
except (AirNowError, ClientConnectorError) as error:
raise UpdateFailed(error) from error
if not obs:
raise UpdateFailed("No data was returned from AirNow")
max_aqi = 0
max_aqi_level = 0
max_aqi_desc = ""
max_aqi_poll = ""
for obv in obs:
# Convert AQIs to Concentration
pollutant = obv[ATTR_API_AQI_PARAM]
concentration = aqi_to_concentration(obv[ATTR_API_AQI], pollutant)
data[obv[ATTR_API_AQI_PARAM]] = concentration
# Overall AQI is the max of all pollutant AQIs
if obv[ATTR_API_AQI] > max_aqi:
max_aqi = obv[ATTR_API_AQI]
max_aqi_level = obv[ATTR_API_CATEGORY][ATTR_API_CAT_LEVEL]
max_aqi_desc = obv[ATTR_API_CATEGORY][ATTR_API_CAT_DESCRIPTION]
max_aqi_poll = pollutant
# Copy other data from PM2.5 Value
if obv[ATTR_API_AQI_PARAM] == ATTR_API_PM25:
# Copy Report Details
data[ATTR_API_REPORT_DATE] = obv[ATTR_API_REPORT_DATE]
data[ATTR_API_REPORT_HOUR] = obv[ATTR_API_REPORT_HOUR]
# Copy Station Details
data[ATTR_API_STATE] = obv[ATTR_API_STATE]
data[ATTR_API_STATION] = obv[ATTR_API_STATION]
data[ATTR_API_STATION_LATITUDE] = obv[ATTR_API_STATION_LATITUDE]
data[ATTR_API_STATION_LONGITUDE] = obv[ATTR_API_STATION_LONGITUDE]
# Store Overall AQI
data[ATTR_API_AQI] = max_aqi
data[ATTR_API_AQI_LEVEL] = max_aqi_level
data[ATTR_API_AQI_DESCRIPTION] = max_aqi_desc
data[ATTR_API_POLLUTANT] = max_aqi_poll
return data