Files
core/homeassistant/components/altruist/coordinator.py
Alena Bugrova f8267b13d7 Add Altruist integration to Core (#146158)
* add altruist integration and tests

* requested fixes + remove some deprecated sensors

* add tests for unknown sensor and device attribute in config_flow

* use CONF_ in data_schema

* suggested fixes

* remove test_setup_entry_success

* create ZeroconfServiceInfo in tests

* use CONF_IP_ADDRESS in tests

* add unique id assert

* add integration to strict-typing, set unavailable if no sensor key in data, change device name

* use add_suggested_values_to_schema, mmHg for pressure

* update snapshots and config entry name in tests

* remove changes in devcontainer config

* fixture for create client error, typing in tests, remove "Altruist" from device name

* change native_value_fn return type

* change sensor.py docstring

* remove device id from entry data, fix docstrings

* remove checks for client and device attributes

* use less variables in tests

* change creating AltruistSensor, remove device from arguments

* Update homeassistant/components/altruist/sensor.py

* Update homeassistant/components/altruist/quality_scale.yaml

* Update homeassistant/components/altruist/quality_scale.yaml

* Update quality_scale.yaml

* hassfest run

* suggested fixes

* set suggested_unit_of_measurement for pressure

* use mock_config_entry, update snapshots

* abort if cant create client on zeroconf step

* move sensor names in translatin placeholders

---------

Co-authored-by: Josef Zweck <josef@zweck.dev>
2025-06-23 16:57:51 +02:00

65 lines
2.1 KiB
Python

"""Coordinator module for Altruist integration in Home Assistant.
This module defines the AltruistDataUpdateCoordinator class, which manages
data updates for Altruist sensors using the AltruistClient.
"""
from datetime import timedelta
import logging
from altruistclient import AltruistClient, AltruistError
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import CONF_HOST
_LOGGER = logging.getLogger(__name__)
UPDATE_INTERVAL = timedelta(seconds=15)
type AltruistConfigEntry = ConfigEntry[AltruistDataUpdateCoordinator]
class AltruistDataUpdateCoordinator(DataUpdateCoordinator[dict[str, str]]):
"""Coordinates data updates for Altruist sensors."""
client: AltruistClient
def __init__(
self,
hass: HomeAssistant,
config_entry: AltruistConfigEntry,
) -> None:
"""Initialize the data update coordinator for Altruist sensors."""
device_id = config_entry.unique_id
super().__init__(
hass,
logger=_LOGGER,
config_entry=config_entry,
name=f"Altruist {device_id}",
update_interval=UPDATE_INTERVAL,
)
self._ip_address = config_entry.data[CONF_HOST]
async def _async_setup(self) -> None:
try:
self.client = await AltruistClient.from_ip_address(
async_get_clientsession(self.hass), self._ip_address
)
await self.client.fetch_data()
except AltruistError as e:
raise ConfigEntryNotReady("Error in Altruist setup") from e
async def _async_update_data(self) -> dict[str, str]:
try:
fetched_data = await self.client.fetch_data()
except AltruistError as ex:
raise UpdateFailed(
f"The Altruist {self.client.device_id} is unavailable: {ex}"
) from ex
return {item["value_type"]: item["value"] for item in fetched_data}