Allen Porter 490d2cfb71
Move Rainbird to async client library (#84417)
* Bump pyrainbird to 0.7.0 and move to async library

* Share updates across sensors

* Fix test version and delete dead code

* Add test coverage for yaml configuration

* Address PR feedback
2022-12-22 13:00:17 -08:00

48 lines
1.3 KiB
Python

"""Update coordinators for rainbird."""
from __future__ import annotations
from collections.abc import Awaitable, Callable
import datetime
import logging
from typing import TypeVar
import async_timeout
from pyrainbird.async_client import RainbirdApiException
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
TIMEOUT_SECONDS = 20
UPDATE_INTERVAL = datetime.timedelta(minutes=1)
_LOGGER = logging.getLogger(__name__)
_T = TypeVar("_T")
class RainbirdUpdateCoordinator(DataUpdateCoordinator[_T]):
"""Coordinator for rainbird API calls."""
def __init__(
self,
hass: HomeAssistant,
update_method: Callable[[], Awaitable[_T]],
) -> None:
"""Initialize ZoneStateUpdateCoordinator."""
super().__init__(
hass,
_LOGGER,
name="Rainbird Zones",
update_method=update_method,
update_interval=UPDATE_INTERVAL,
)
async def _async_update_data(self) -> _T:
"""Fetch data from API endpoint."""
try:
async with async_timeout.timeout(TIMEOUT_SECONDS):
return await self.update_method() # type: ignore[misc]
except RainbirdApiException as err:
raise UpdateFailed(f"Error communicating with API: {err}") from err