Use typed coordinator and runtime_data in eafm (#136629)

* Move coordinator and use runtime_data in eafm

* Add type hints
This commit is contained in:
epenet 2025-01-27 15:05:20 +01:00 committed by GitHub
parent 2878ba601b
commit 7dc2b92452
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 68 additions and 56 deletions

View File

@ -1,64 +1,22 @@
"""UK Environment Agency Flood Monitoring Integration."""
import asyncio
from datetime import timedelta
import logging
from typing import Any
from aioeafm import get_station
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import DOMAIN
from .coordinator import EafmConfigEntry, EafmCoordinator
PLATFORMS = [Platform.SENSOR]
_LOGGER = logging.getLogger(__name__)
def get_measures(station_data):
"""Force measure key to always be a list."""
if "measures" not in station_data:
return []
if isinstance(station_data["measures"], dict):
return [station_data["measures"]]
return station_data["measures"]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_setup_entry(hass: HomeAssistant, entry: EafmConfigEntry) -> bool:
"""Set up flood monitoring sensors for this config entry."""
station_key = entry.data["station"]
session = async_get_clientsession(hass=hass)
async def _async_update_data() -> dict[str, dict[str, Any]]:
# DataUpdateCoordinator will handle aiohttp ClientErrors and timeouts
async with asyncio.timeout(30):
data = await get_station(session, station_key)
measures = get_measures(data)
# Turn data.measures into a dict rather than a list so easier for entities to
# find themselves.
data["measures"] = {measure["@id"]: measure for measure in measures}
return data
coordinator = DataUpdateCoordinator[dict[str, dict[str, Any]]](
hass,
_LOGGER,
config_entry=entry,
name="sensor",
update_method=_async_update_data,
update_interval=timedelta(seconds=15 * 60),
)
coordinator = EafmCoordinator(hass, entry=entry)
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: EafmConfigEntry) -> bool:
"""Unload flood monitoring sensors."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@ -0,0 +1,57 @@
"""UK Environment Agency Flood Monitoring Integration."""
import asyncio
from datetime import timedelta
import logging
from typing import Any
from aioeafm import get_station
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
PLATFORMS = [Platform.SENSOR]
_LOGGER = logging.getLogger(__name__)
type EafmConfigEntry = ConfigEntry[EafmCoordinator]
def _get_measures(station_data: dict[str, Any]) -> list[dict[str, Any]]:
"""Force measure key to always be a list."""
if "measures" not in station_data:
return []
if isinstance(station_data["measures"], dict):
return [station_data["measures"]]
return station_data["measures"]
class EafmCoordinator(DataUpdateCoordinator[dict[str, dict[str, Any]]]):
"""Class to manage fetching UK Flood Monitoring data."""
def __init__(self, hass: HomeAssistant, entry: EafmConfigEntry) -> None:
"""Initialize."""
self._station_key = entry.data["station"]
self._session = async_get_clientsession(hass=hass)
super().__init__(
hass,
_LOGGER,
config_entry=entry,
name="sensor",
update_interval=timedelta(seconds=15 * 60),
)
async def _async_update_data(self) -> dict[str, dict[str, Any]]:
"""Fetch the latest data from the source."""
# DataUpdateCoordinator will handle aiohttp ClientErrors and timeouts
async with asyncio.timeout(30):
data = await get_station(self._session, self._station_key)
measures = _get_measures(data)
# Turn data.measures into a dict rather than a list so easier for entities to
# find themselves.
data["measures"] = {measure["@id"]: measure for measure in measures}
return data

View File

@ -3,17 +3,14 @@
from typing import Any
from homeassistant.components.sensor import SensorEntity, SensorStateClass
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import UnitOfLength
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
from .coordinator import EafmConfigEntry, EafmCoordinator
UNIT_MAPPING = {
"http://qudt.org/1.1/vocab/unit#Meter": UnitOfLength.METERS,
@ -22,11 +19,11 @@ UNIT_MAPPING = {
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
config_entry: EafmConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up UK Flood Monitoring Sensors."""
coordinator: DataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
coordinator = config_entry.runtime_data
created_entities: set[str] = set()
@callback
@ -70,7 +67,7 @@ class Measurement(CoordinatorEntity, SensorEntity):
_attr_has_entity_name = True
_attr_name = None
def __init__(self, coordinator, key):
def __init__(self, coordinator: EafmCoordinator, key: str) -> None:
"""Initialise the gauge with a data instance and station."""
super().__init__(coordinator)
self.key = key

View File

@ -15,5 +15,5 @@ def mock_get_stations():
@pytest.fixture
def mock_get_station():
"""Mock aioeafm.get_station."""
with patch("homeassistant.components.eafm.get_station") as patched:
with patch("homeassistant.components.eafm.coordinator.get_station") as patched:
yield patched