Files
core/homeassistant/components/airos/coordinator.py
Tom b67e85e8da Introduce Ubiquiti UISP airOS (#148989)
Co-authored-by: Norbert Rittel <norbert@rittel.de>
Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
2025-07-29 19:41:13 +02:00

67 lines
2.2 KiB
Python

"""DataUpdateCoordinator for AirOS."""
from __future__ import annotations
import logging
from airos.airos8 import AirOS, AirOSData
from airos.exceptions import (
ConnectionAuthenticationError,
ConnectionSetupError,
DataMissingError,
DeviceConnectionError,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN, SCAN_INTERVAL
_LOGGER = logging.getLogger(__name__)
type AirOSConfigEntry = ConfigEntry[AirOSDataUpdateCoordinator]
class AirOSDataUpdateCoordinator(DataUpdateCoordinator[AirOSData]):
"""Class to manage fetching AirOS data from single endpoint."""
config_entry: AirOSConfigEntry
def __init__(
self, hass: HomeAssistant, config_entry: AirOSConfigEntry, airos_device: AirOS
) -> None:
"""Initialize the coordinator."""
self.airos_device = airos_device
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=DOMAIN,
update_interval=SCAN_INTERVAL,
)
async def _async_update_data(self) -> AirOSData:
"""Fetch data from AirOS."""
try:
await self.airos_device.login()
return await self.airos_device.status()
except (ConnectionAuthenticationError,) as err:
_LOGGER.exception("Error authenticating with airOS device")
raise ConfigEntryError(
translation_domain=DOMAIN, translation_key="invalid_auth"
) from err
except (ConnectionSetupError, DeviceConnectionError, TimeoutError) as err:
_LOGGER.error("Error connecting to airOS device: %s", err)
raise UpdateFailed(
translation_domain=DOMAIN,
translation_key="cannot_connect",
) from err
except (DataMissingError,) as err:
_LOGGER.error("Expected data not returned by airOS device: %s", err)
raise UpdateFailed(
translation_domain=DOMAIN,
translation_key="error_data_missing",
) from err