Implement DataUpdateCoordinator in luftdaten (#62313)

* Implement DataUpdateCoordinator in luftdaten

* Typing additions/fixes

* Update homeassistant/components/luftdaten/sensor.py

Co-authored-by: epenet <6771947+epenet@users.noreply.github.com>

Co-authored-by: epenet <6771947+epenet@users.noreply.github.com>
This commit is contained in:
Franck Nijhof 2021-12-21 22:51:47 +01:00 committed by GitHub
parent d82e8b6cc0
commit 82013e68fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 144 additions and 205 deletions

View File

@ -2,159 +2,69 @@
from __future__ import annotations
import logging
from typing import Any
from luftdaten import Luftdaten
from luftdaten.exceptions import LuftdatenError
from homeassistant.components.sensor import SensorDeviceClass, SensorEntityDescription
from homeassistant.const import (
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
CONF_MONITORED_CONDITIONS,
CONF_SCAN_INTERVAL,
CONF_SENSORS,
PERCENTAGE,
PRESSURE_PA,
TEMP_CELSIUS,
Platform,
)
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import CONF_SENSOR_ID, DEFAULT_SCAN_INTERVAL, DOMAIN
_LOGGER = logging.getLogger(__name__)
DATA_LUFTDATEN = "luftdaten"
DATA_LUFTDATEN_CLIENT = "data_luftdaten_client"
DATA_LUFTDATEN_LISTENER = "data_luftdaten_listener"
PLATFORMS = [Platform.SENSOR]
TOPIC_UPDATE = f"{DOMAIN}_data_update"
SENSOR_TYPES: tuple[SensorEntityDescription, ...] = (
SensorEntityDescription(
key="temperature",
name="Temperature",
native_unit_of_measurement=TEMP_CELSIUS,
device_class=SensorDeviceClass.TEMPERATURE,
),
SensorEntityDescription(
key="humidity",
name="Humidity",
icon="mdi:water-percent",
native_unit_of_measurement=PERCENTAGE,
device_class=SensorDeviceClass.HUMIDITY,
),
SensorEntityDescription(
key="pressure",
name="Pressure",
icon="mdi:arrow-down-bold",
native_unit_of_measurement=PRESSURE_PA,
device_class=SensorDeviceClass.PRESSURE,
),
SensorEntityDescription(
key="pressure_at_sealevel",
name="Pressure at sealevel",
icon="mdi:download",
native_unit_of_measurement=PRESSURE_PA,
device_class=SensorDeviceClass.PRESSURE,
),
SensorEntityDescription(
key="P1",
name="PM10",
icon="mdi:thought-bubble",
native_unit_of_measurement=CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
),
SensorEntityDescription(
key="P2",
name="PM2.5",
icon="mdi:thought-bubble-outline",
native_unit_of_measurement=CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
),
)
SENSOR_KEYS: list[str] = [desc.key for desc in SENSOR_TYPES]
CONFIG_SCHEMA = cv.removed(DOMAIN, raise_if_present=False)
async def async_setup_entry(hass, config_entry):
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Luftdaten as config entry."""
hass.data.setdefault(
DOMAIN,
{
DATA_LUFTDATEN_CLIENT: {},
DATA_LUFTDATEN_LISTENER: {},
},
)
# For backwards compat, set unique ID
if config_entry.unique_id is None:
if entry.unique_id is None:
hass.config_entries.async_update_entry(
config_entry, unique_id=config_entry.data[CONF_SENSOR_ID]
entry, unique_id=entry.data[CONF_SENSOR_ID]
)
try:
luftdaten = LuftDatenData(
Luftdaten(config_entry.data[CONF_SENSOR_ID]),
config_entry.data.get(CONF_SENSORS, {}).get(
CONF_MONITORED_CONDITIONS, SENSOR_KEYS
),
)
await luftdaten.async_update()
hass.data[DOMAIN][DATA_LUFTDATEN_CLIENT][config_entry.entry_id] = luftdaten
except LuftdatenError as err:
raise ConfigEntryNotReady from err
luftdaten = Luftdaten(entry.data[CONF_SENSOR_ID])
hass.config_entries.async_setup_platforms(config_entry, PLATFORMS)
async def async_update() -> dict[Any, Any]:
"""Update sensor/binary sensor data."""
try:
await luftdaten.get_data()
except LuftdatenError as err:
raise UpdateFailed("Unable to retrieve data from luftdaten.info") from err
async def refresh_sensors(event_time):
"""Refresh Luftdaten data."""
await luftdaten.async_update()
async_dispatcher_send(hass, TOPIC_UPDATE)
if not luftdaten.values:
raise UpdateFailed("Did not receive sensor data from luftdaten.info")
hass.data[DOMAIN][DATA_LUFTDATEN_LISTENER][
config_entry.entry_id
] = async_track_time_interval(
data = luftdaten.values
data.update(luftdaten.meta)
return data
coordinator: DataUpdateCoordinator[dict[Any, Any]] = DataUpdateCoordinator(
hass,
refresh_sensors,
hass.data[DOMAIN].get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL),
_LOGGER,
name=f"{DOMAIN}_{luftdaten.sensor_id}",
update_interval=DEFAULT_SCAN_INTERVAL,
update_method=async_update,
)
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
return True
async def async_unload_entry(hass, config_entry):
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload an Luftdaten config entry."""
remove_listener = hass.data[DOMAIN][DATA_LUFTDATEN_LISTENER].pop(
config_entry.entry_id
)
remove_listener()
hass.data[DOMAIN][DATA_LUFTDATEN_CLIENT].pop(config_entry.entry_id)
return await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)
class LuftDatenData:
"""Define a generic Luftdaten object."""
def __init__(self, client, sensor_conditions):
"""Initialize the Luftdata object."""
self.client = client
self.data = {}
self.sensor_conditions = sensor_conditions
async def async_update(self):
"""Update sensor/binary sensor data."""
try:
await self.client.get_data()
if self.client.values:
self.data[DATA_LUFTDATEN] = self.client.values
self.data[DATA_LUFTDATEN].update(self.client.meta)
except LuftdatenError:
_LOGGER.error("Unable to retrieve data from luftdaten.info")
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
del hass.data[DOMAIN][entry.entry_id]
return unload_ok

View File

@ -1,98 +1,127 @@
"""Support for Luftdaten sensors."""
from homeassistant.components.sensor import SensorEntity, SensorEntityDescription
from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE, CONF_SHOW_ON_MAP
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from __future__ import annotations
from . import DATA_LUFTDATEN, DATA_LUFTDATEN_CLIENT, DOMAIN, SENSOR_TYPES, TOPIC_UPDATE
from .const import ATTR_SENSOR_ID
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_LATITUDE,
ATTR_LONGITUDE,
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
CONF_SHOW_ON_MAP,
PERCENTAGE,
PRESSURE_PA,
TEMP_CELSIUS,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from . import DOMAIN
from .const import ATTR_SENSOR_ID, CONF_SENSOR_ID
SENSORS: tuple[SensorEntityDescription, ...] = (
SensorEntityDescription(
key="temperature",
name="Temperature",
native_unit_of_measurement=TEMP_CELSIUS,
device_class=SensorDeviceClass.TEMPERATURE,
),
SensorEntityDescription(
key="humidity",
name="Humidity",
icon="mdi:water-percent",
native_unit_of_measurement=PERCENTAGE,
device_class=SensorDeviceClass.HUMIDITY,
),
SensorEntityDescription(
key="pressure",
name="Pressure",
icon="mdi:arrow-down-bold",
native_unit_of_measurement=PRESSURE_PA,
device_class=SensorDeviceClass.PRESSURE,
),
SensorEntityDescription(
key="pressure_at_sealevel",
name="Pressure at sealevel",
icon="mdi:download",
native_unit_of_measurement=PRESSURE_PA,
device_class=SensorDeviceClass.PRESSURE,
),
SensorEntityDescription(
key="P1",
name="PM10",
icon="mdi:thought-bubble",
native_unit_of_measurement=CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
),
SensorEntityDescription(
key="P2",
name="PM2.5",
icon="mdi:thought-bubble-outline",
native_unit_of_measurement=CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
),
)
async def async_setup_entry(hass, entry, async_add_entities):
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up a Luftdaten sensor based on a config entry."""
luftdaten = hass.data[DOMAIN][DATA_LUFTDATEN_CLIENT][entry.entry_id]
coordinator = hass.data[DOMAIN][entry.entry_id]
entities = [
LuftdatenSensor(luftdaten, description, entry.data[CONF_SHOW_ON_MAP])
for description in SENSOR_TYPES
if description.key in luftdaten.sensor_conditions
]
async_add_entities(entities, True)
async_add_entities(
LuftdatenSensor(
coordinator=coordinator,
description=description,
sensor_id=entry.data[CONF_SENSOR_ID],
show_on_map=entry.data[CONF_SHOW_ON_MAP],
)
for description in SENSORS
if description.key in coordinator.data
)
class LuftdatenSensor(SensorEntity):
class LuftdatenSensor(CoordinatorEntity, SensorEntity):
"""Implementation of a Luftdaten sensor."""
_attr_attribution = "Data provided by luftdaten.info"
_attr_should_poll = False
def __init__(self, luftdaten, description: SensorEntityDescription, show):
def __init__(
self,
*,
coordinator: DataUpdateCoordinator,
description: SensorEntityDescription,
sensor_id: int,
show_on_map: bool,
) -> None:
"""Initialize the Luftdaten sensor."""
super().__init__(coordinator=coordinator)
self.entity_description = description
self._async_unsub_dispatcher_connect = None
self.luftdaten = luftdaten
self._data = None
self._show_on_map = show
self._attrs = {}
self._attr_unique_id = f"{sensor_id}_{description.key}"
self._attr_extra_state_attributes = {
ATTR_SENSOR_ID: sensor_id,
}
if show_on_map:
self._attr_extra_state_attributes[ATTR_LONGITUDE] = coordinator.data[
"longitude"
]
self._attr_extra_state_attributes[ATTR_LATITUDE] = coordinator.data[
"latitude"
]
@property
def native_value(self):
def native_value(self) -> float | None:
"""Return the state of the device."""
if self._data is not None:
try:
return self._data[self.entity_description.key]
except KeyError:
return None
@property
def unique_id(self) -> str:
"""Return a unique, friendly identifier for this entity."""
if self._data is not None:
try:
return f"{self._data['sensor_id']}_{self.entity_description.key}"
except KeyError:
return None
@property
def extra_state_attributes(self):
"""Return the state attributes."""
if self._data is not None:
try:
self._attrs[ATTR_SENSOR_ID] = self._data["sensor_id"]
except KeyError:
return None
on_map = ATTR_LATITUDE, ATTR_LONGITUDE
no_map = "lat", "long"
lat_format, lon_format = on_map if self._show_on_map else no_map
try:
self._attrs[lon_format] = self._data["longitude"]
self._attrs[lat_format] = self._data["latitude"]
return self._attrs
except KeyError:
return
async def async_added_to_hass(self):
"""Register callbacks."""
@callback
def update():
"""Update the state."""
self.async_schedule_update_ha_state(True)
self._async_unsub_dispatcher_connect = async_dispatcher_connect(
self.hass, TOPIC_UPDATE, update
)
async def async_will_remove_from_hass(self):
"""Disconnect dispatcher listener when removed."""
if self._async_unsub_dispatcher_connect:
self._async_unsub_dispatcher_connect()
async def async_update(self):
"""Get the latest data and update the state."""
try:
self._data = self.luftdaten.data[DATA_LUFTDATEN]
except KeyError:
return
if (
not self.coordinator.data
or (value := self.coordinator.data.get(self.entity_description.key)) is None
):
return None
return value