Michael c595b12e98
Explicitly pass in the config_entry in airzone coordinator init (#137702)
explicitly pass in the config_entry in airzone coordinator init
2025-02-07 20:41:53 +01:00

56 lines
1.5 KiB
Python

"""The Airzone integration."""
from __future__ import annotations
from asyncio import timeout
from datetime import timedelta
import logging
from typing import Any
from aioairzone.exceptions import AirzoneError
from aioairzone.localapi import AirzoneLocalApi
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import AIOAIRZONE_DEVICE_TIMEOUT_SEC, DOMAIN
SCAN_INTERVAL = timedelta(seconds=60)
_LOGGER = logging.getLogger(__name__)
type AirzoneConfigEntry = ConfigEntry[AirzoneUpdateCoordinator]
class AirzoneUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Class to manage fetching data from the Airzone device."""
config_entry: AirzoneConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: AirzoneConfigEntry,
airzone: AirzoneLocalApi,
) -> None:
"""Initialize."""
self.airzone = airzone
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=DOMAIN,
update_interval=SCAN_INTERVAL,
)
async def _async_update_data(self) -> dict[str, Any]:
"""Update data via library."""
async with timeout(AIOAIRZONE_DEVICE_TIMEOUT_SEC):
try:
await self.airzone.update()
except AirzoneError as error:
raise UpdateFailed(error) from error
return self.airzone.data()