Move huisbaasje coordinator to separate module (#144955)

This commit is contained in:
epenet 2025-05-16 12:07:19 +02:00 committed by GitHub
parent 97869636f8
commit b4a1bdcb83
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 136 additions and 113 deletions

View File

@ -1,29 +1,15 @@
"""The EnergyFlip integration.""" """The EnergyFlip integration."""
import asyncio
from datetime import timedelta
import logging import logging
from typing import Any
from energyflip import EnergyFlip, EnergyFlipException from energyflip import EnergyFlip, EnergyFlipException
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME, Platform from homeassistant.const import CONF_PASSWORD, CONF_USERNAME, Platform
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import ( from .const import DATA_COORDINATOR, DOMAIN, FETCH_TIMEOUT, SOURCE_TYPES
DATA_COORDINATOR, from .coordinator import EnergyFlipUpdateCoordinator
DOMAIN,
FETCH_TIMEOUT,
POLLING_INTERVAL,
SENSOR_TYPE_RATE,
SENSOR_TYPE_THIS_DAY,
SENSOR_TYPE_THIS_MONTH,
SENSOR_TYPE_THIS_WEEK,
SENSOR_TYPE_THIS_YEAR,
SOURCE_TYPES,
)
PLATFORMS = [Platform.SENSOR] PLATFORMS = [Platform.SENSOR]
@ -47,18 +33,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
_LOGGER.error("Authentication failed: %s", str(exception)) _LOGGER.error("Authentication failed: %s", str(exception))
return False return False
async def async_update_data() -> dict[str, dict[str, Any]]:
return await async_update_energyflip(energyflip)
# Create a coordinator for polling updates # Create a coordinator for polling updates
coordinator = DataUpdateCoordinator( coordinator = EnergyFlipUpdateCoordinator(hass, entry, energyflip)
hass,
_LOGGER,
config_entry=entry,
name="sensor",
update_method=async_update_data,
update_interval=timedelta(seconds=POLLING_INTERVAL),
)
await coordinator.async_config_entry_first_refresh() await coordinator.async_config_entry_first_refresh()
@ -81,77 +57,3 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass.data[DOMAIN].pop(entry.entry_id) hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok return unload_ok
async def async_update_energyflip(energyflip: EnergyFlip) -> dict[str, dict[str, Any]]:
"""Update the data by performing a request to EnergyFlip."""
try:
# Note: TimeoutError and aiohttp.ClientError are already
# handled by the data update coordinator.
async with asyncio.timeout(FETCH_TIMEOUT):
if not energyflip.is_authenticated():
_LOGGER.warning("EnergyFlip is unauthenticated. Reauthenticating")
await energyflip.authenticate()
current_measurements = await energyflip.current_measurements()
return {
source_type: {
SENSOR_TYPE_RATE: _get_measurement_rate(
current_measurements, source_type
),
SENSOR_TYPE_THIS_DAY: _get_cumulative_value(
current_measurements, source_type, SENSOR_TYPE_THIS_DAY
),
SENSOR_TYPE_THIS_WEEK: _get_cumulative_value(
current_measurements, source_type, SENSOR_TYPE_THIS_WEEK
),
SENSOR_TYPE_THIS_MONTH: _get_cumulative_value(
current_measurements, source_type, SENSOR_TYPE_THIS_MONTH
),
SENSOR_TYPE_THIS_YEAR: _get_cumulative_value(
current_measurements, source_type, SENSOR_TYPE_THIS_YEAR
),
}
for source_type in SOURCE_TYPES
}
except EnergyFlipException as exception:
raise UpdateFailed(f"Error communicating with API: {exception}") from exception
def _get_cumulative_value(
current_measurements: dict,
source_type: str,
period_type: str,
):
"""Get the cumulative energy consumption for a certain period.
:param current_measurements: The result from the EnergyFlip client
:param source_type: The source of energy (electricity or gas)
:param period_type: The period for which cumulative value should be given.
"""
if source_type in current_measurements:
if (
period_type in current_measurements[source_type]
and current_measurements[source_type][period_type] is not None
):
return current_measurements[source_type][period_type]["value"]
else:
_LOGGER.error(
"Source type %s not present in %s", source_type, current_measurements
)
return None
def _get_measurement_rate(current_measurements: dict, source_type: str):
if source_type in current_measurements:
if (
"measurement" in current_measurements[source_type]
and current_measurements[source_type]["measurement"] is not None
):
return current_measurements[source_type]["measurement"]["rate"]
else:
_LOGGER.error(
"Source type %s not present in %s", source_type, current_measurements
)
return None

View File

@ -0,0 +1,126 @@
"""The EnergyFlip integration."""
import asyncio
from datetime import timedelta
import logging
from typing import Any
from energyflip import EnergyFlip, EnergyFlipException
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
FETCH_TIMEOUT,
POLLING_INTERVAL,
SENSOR_TYPE_RATE,
SENSOR_TYPE_THIS_DAY,
SENSOR_TYPE_THIS_MONTH,
SENSOR_TYPE_THIS_WEEK,
SENSOR_TYPE_THIS_YEAR,
SOURCE_TYPES,
)
PLATFORMS = [Platform.SENSOR]
_LOGGER = logging.getLogger(__name__)
class EnergyFlipUpdateCoordinator(DataUpdateCoordinator[dict[str, dict[str, Any]]]):
"""EnergyFlip data update coordinator."""
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: ConfigEntry,
energyflip: EnergyFlip,
) -> None:
"""Initialize the Huisbaasje data coordinator."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name="sensor",
update_interval=timedelta(seconds=POLLING_INTERVAL),
)
self._energyflip = energyflip
async def _async_update_data(self) -> dict[str, dict[str, Any]]:
"""Update the data by performing a request to EnergyFlip."""
try:
# Note: TimeoutError and aiohttp.ClientError are already
# handled by the data update coordinator.
async with asyncio.timeout(FETCH_TIMEOUT):
if not self._energyflip.is_authenticated():
_LOGGER.warning("EnergyFlip is unauthenticated. Reauthenticating")
await self._energyflip.authenticate()
current_measurements = await self._energyflip.current_measurements()
return {
source_type: {
SENSOR_TYPE_RATE: _get_measurement_rate(
current_measurements, source_type
),
SENSOR_TYPE_THIS_DAY: _get_cumulative_value(
current_measurements, source_type, SENSOR_TYPE_THIS_DAY
),
SENSOR_TYPE_THIS_WEEK: _get_cumulative_value(
current_measurements, source_type, SENSOR_TYPE_THIS_WEEK
),
SENSOR_TYPE_THIS_MONTH: _get_cumulative_value(
current_measurements, source_type, SENSOR_TYPE_THIS_MONTH
),
SENSOR_TYPE_THIS_YEAR: _get_cumulative_value(
current_measurements, source_type, SENSOR_TYPE_THIS_YEAR
),
}
for source_type in SOURCE_TYPES
}
except EnergyFlipException as exception:
raise UpdateFailed(
f"Error communicating with API: {exception}"
) from exception
def _get_cumulative_value(
current_measurements: dict,
source_type: str,
period_type: str,
):
"""Get the cumulative energy consumption for a certain period.
:param current_measurements: The result from the EnergyFlip client
:param source_type: The source of energy (electricity or gas)
:param period_type: The period for which cumulative value should be given.
"""
if source_type in current_measurements:
if (
period_type in current_measurements[source_type]
and current_measurements[source_type][period_type] is not None
):
return current_measurements[source_type][period_type]["value"]
else:
_LOGGER.error(
"Source type %s not present in %s", source_type, current_measurements
)
return None
def _get_measurement_rate(current_measurements: dict, source_type: str):
if source_type in current_measurements:
if (
"measurement" in current_measurements[source_type]
and current_measurements[source_type]["measurement"] is not None
):
return current_measurements[source_type]["measurement"]["rate"]
else:
_LOGGER.error(
"Source type %s not present in %s", source_type, current_measurements
)
return None

View File

@ -4,7 +4,6 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
import logging import logging
from typing import Any
from energyflip.const import ( from energyflip.const import (
SOURCE_TYPE_ELECTRICITY, SOURCE_TYPE_ELECTRICITY,
@ -31,10 +30,7 @@ from homeassistant.const import (
) )
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.update_coordinator import ( from homeassistant.helpers.update_coordinator import CoordinatorEntity
CoordinatorEntity,
DataUpdateCoordinator,
)
from .const import ( from .const import (
DATA_COORDINATOR, DATA_COORDINATOR,
@ -45,6 +41,7 @@ from .const import (
SENSOR_TYPE_THIS_WEEK, SENSOR_TYPE_THIS_WEEK,
SENSOR_TYPE_THIS_YEAR, SENSOR_TYPE_THIS_YEAR,
) )
from .coordinator import EnergyFlipUpdateCoordinator
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -222,9 +219,9 @@ async def async_setup_entry(
async_add_entities: AddConfigEntryEntitiesCallback, async_add_entities: AddConfigEntryEntitiesCallback,
) -> None: ) -> None:
"""Set up the sensor platform.""" """Set up the sensor platform."""
coordinator: DataUpdateCoordinator[dict[str, dict[str, Any]]] = hass.data[DOMAIN][ coordinator: EnergyFlipUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id][
config_entry.entry_id DATA_COORDINATOR
][DATA_COORDINATOR] ]
user_id = config_entry.data[CONF_ID] user_id = config_entry.data[CONF_ID]
async_add_entities( async_add_entities(
@ -233,9 +230,7 @@ async def async_setup_entry(
) )
class EnergyFlipSensor( class EnergyFlipSensor(CoordinatorEntity[EnergyFlipUpdateCoordinator], SensorEntity):
CoordinatorEntity[DataUpdateCoordinator[dict[str, dict[str, Any]]]], SensorEntity
):
"""Defines a EnergyFlip sensor.""" """Defines a EnergyFlip sensor."""
entity_description: EnergyFlipSensorEntityDescription entity_description: EnergyFlipSensorEntityDescription
@ -243,7 +238,7 @@ class EnergyFlipSensor(
def __init__( def __init__(
self, self,
coordinator: DataUpdateCoordinator[dict[str, dict[str, Any]]], coordinator: EnergyFlipUpdateCoordinator,
user_id: str, user_id: str,
description: EnergyFlipSensorEntityDescription, description: EnergyFlipSensorEntityDescription,
) -> None: ) -> None: