mirror of
https://github.com/home-assistant/core.git
synced 2026-04-23 05:35:35 +00:00
Compare commits
11 Commits
dev
...
tibber_ref
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a9b984d705 | ||
|
|
886c0578e7 | ||
|
|
02e579c5ae | ||
|
|
d47f3ca1d8 | ||
|
|
02e5f2c234 | ||
|
|
e42195bfed | ||
|
|
b2944a6d66 | ||
|
|
03d15fb70c | ||
|
|
01d57ddcf1 | ||
|
|
cfc85cfd29 | ||
|
|
ca2dc20709 |
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
@@ -23,7 +23,7 @@ from homeassistant.helpers.typing import ConfigType
|
|||||||
from homeassistant.util import dt as dt_util, ssl as ssl_util
|
from homeassistant.util import dt as dt_util, ssl as ssl_util
|
||||||
|
|
||||||
from .const import AUTH_IMPLEMENTATION, DATA_HASS_CONFIG, DOMAIN, TibberConfigEntry
|
from .const import AUTH_IMPLEMENTATION, DATA_HASS_CONFIG, DOMAIN, TibberConfigEntry
|
||||||
from .coordinator import TibberDataAPICoordinator
|
from .coordinator import TibberDataAPICoordinator, TibberDataCoordinator
|
||||||
from .services import async_setup_services
|
from .services import async_setup_services
|
||||||
|
|
||||||
PLATFORMS = [Platform.BINARY_SENSOR, Platform.NOTIFY, Platform.SENSOR]
|
PLATFORMS = [Platform.BINARY_SENSOR, Platform.NOTIFY, Platform.SENSOR]
|
||||||
@@ -38,7 +38,8 @@ class TibberRuntimeData:
|
|||||||
"""Runtime data for Tibber API entries."""
|
"""Runtime data for Tibber API entries."""
|
||||||
|
|
||||||
session: OAuth2Session
|
session: OAuth2Session
|
||||||
data_api_coordinator: TibberDataAPICoordinator | None = field(default=None)
|
data_api_coordinator: TibberDataAPICoordinator
|
||||||
|
data_coordinator: TibberDataCoordinator
|
||||||
_client: tibber.Tibber | None = None
|
_client: tibber.Tibber | None = None
|
||||||
|
|
||||||
async def async_get_client(self, hass: HomeAssistant) -> tibber.Tibber:
|
async def async_get_client(self, hass: HomeAssistant) -> tibber.Tibber:
|
||||||
@@ -100,8 +101,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: TibberConfigEntry) -> bo
|
|||||||
except ClientError as err:
|
except ClientError as err:
|
||||||
raise ConfigEntryNotReady from err
|
raise ConfigEntryNotReady from err
|
||||||
|
|
||||||
|
data_api_coordinator = TibberDataAPICoordinator(hass, entry)
|
||||||
|
data_coordinator = TibberDataCoordinator(hass, entry)
|
||||||
entry.runtime_data = TibberRuntimeData(
|
entry.runtime_data = TibberRuntimeData(
|
||||||
session=session,
|
session=session,
|
||||||
|
data_api_coordinator=data_api_coordinator,
|
||||||
|
data_coordinator=data_coordinator,
|
||||||
)
|
)
|
||||||
|
|
||||||
tibber_connection = await entry.runtime_data.async_get_client(hass)
|
tibber_connection = await entry.runtime_data.async_get_client(hass)
|
||||||
@@ -124,9 +129,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: TibberConfigEntry) -> bo
|
|||||||
except tibber.FatalHttpExceptionError as err:
|
except tibber.FatalHttpExceptionError as err:
|
||||||
raise ConfigEntryNotReady("Fatal HTTP error from Tibber API") from err
|
raise ConfigEntryNotReady("Fatal HTTP error from Tibber API") from err
|
||||||
|
|
||||||
coordinator = TibberDataAPICoordinator(hass, entry)
|
await data_api_coordinator.async_config_entry_first_refresh()
|
||||||
await coordinator.async_config_entry_first_refresh()
|
await data_coordinator.async_config_entry_first_refresh()
|
||||||
entry.runtime_data.data_api_coordinator = coordinator
|
|
||||||
|
|
||||||
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
||||||
return True
|
return True
|
||||||
|
|||||||
@@ -2,9 +2,11 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from datetime import timedelta
|
from collections.abc import Callable
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime, timedelta
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, cast
|
from typing import TYPE_CHECKING, Any, cast
|
||||||
|
|
||||||
from aiohttp.client_exceptions import ClientError
|
from aiohttp.client_exceptions import ClientError
|
||||||
import tibber
|
import tibber
|
||||||
@@ -21,9 +23,11 @@ from homeassistant.components.recorder.statistics import (
|
|||||||
get_last_statistics,
|
get_last_statistics,
|
||||||
statistics_during_period,
|
statistics_during_period,
|
||||||
)
|
)
|
||||||
from homeassistant.const import UnitOfEnergy
|
from homeassistant.config_entries import ConfigEntry
|
||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.const import EVENT_HOMEASSISTANT_STOP, UnitOfEnergy
|
||||||
|
from homeassistant.core import Event, HomeAssistant, callback
|
||||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||||
|
from homeassistant.helpers.event import async_track_point_in_utc_time
|
||||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||||
from homeassistant.util import dt as dt_util
|
from homeassistant.util import dt as dt_util
|
||||||
from homeassistant.util.unit_conversion import EnergyConverter
|
from homeassistant.util.unit_conversion import EnergyConverter
|
||||||
@@ -31,6 +35,8 @@ from homeassistant.util.unit_conversion import EnergyConverter
|
|||||||
from .const import DOMAIN
|
from .const import DOMAIN
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
from tibber import TibberHome
|
||||||
|
|
||||||
from .const import TibberConfigEntry
|
from .const import TibberConfigEntry
|
||||||
|
|
||||||
FIVE_YEARS = 5 * 365 * 24
|
FIVE_YEARS = 5 * 365 * 24
|
||||||
@@ -38,7 +44,55 @@ FIVE_YEARS = 5 * 365 * 24
|
|||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class TibberDataCoordinator(DataUpdateCoordinator[None]):
|
@dataclass
|
||||||
|
class TibberHomeData:
|
||||||
|
"""Structured data per Tibber home from GraphQL and price API."""
|
||||||
|
|
||||||
|
currency: str
|
||||||
|
price_unit: str
|
||||||
|
current_price: float | None
|
||||||
|
current_price_time: datetime | None
|
||||||
|
intraday_price_ranking: float | None
|
||||||
|
max_price: float
|
||||||
|
avg_price: float
|
||||||
|
min_price: float
|
||||||
|
off_peak_1: float
|
||||||
|
peak: float
|
||||||
|
off_peak_2: float
|
||||||
|
month_cost: float | None
|
||||||
|
peak_hour: float | None
|
||||||
|
peak_hour_time: datetime | None
|
||||||
|
month_cons: float | None
|
||||||
|
|
||||||
|
def __getitem__(self, key: str) -> Any:
|
||||||
|
"""Return attribute by name, or None if missing."""
|
||||||
|
return self.__dict__.get(key)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_home_data(home: TibberHome) -> TibberHomeData:
|
||||||
|
"""Build TibberHomeData from a TibberHome after price info has been fetched."""
|
||||||
|
price_value, price_time, price_rank = home.current_price_data()
|
||||||
|
attrs = home.current_attributes()
|
||||||
|
return TibberHomeData(
|
||||||
|
currency=home.currency,
|
||||||
|
price_unit=home.price_unit,
|
||||||
|
current_price=price_value,
|
||||||
|
current_price_time=price_time,
|
||||||
|
intraday_price_ranking=price_rank,
|
||||||
|
max_price=attrs.get("max_price", 0.0),
|
||||||
|
avg_price=attrs.get("avg_price", 0.0),
|
||||||
|
min_price=attrs.get("min_price", 0.0),
|
||||||
|
off_peak_1=attrs.get("off_peak_1", 0.0),
|
||||||
|
peak=attrs.get("peak", 0.0),
|
||||||
|
off_peak_2=attrs.get("off_peak_2", 0.0),
|
||||||
|
month_cost=getattr(home, "month_cost", None),
|
||||||
|
peak_hour=getattr(home, "peak_hour", None),
|
||||||
|
peak_hour_time=getattr(home, "peak_hour_time", None),
|
||||||
|
month_cons=getattr(home, "month_cons", None),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TibberDataCoordinator(DataUpdateCoordinator[dict[str, TibberHomeData]]):
|
||||||
"""Handle Tibber data and insert statistics."""
|
"""Handle Tibber data and insert statistics."""
|
||||||
|
|
||||||
config_entry: TibberConfigEntry
|
config_entry: TibberConfigEntry
|
||||||
@@ -46,36 +100,84 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
|
|||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
config_entry: TibberConfigEntry,
|
entry: TibberConfigEntry,
|
||||||
tibber_connection: tibber.Tibber,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Initialize the data handler."""
|
"""Initialize the data handler."""
|
||||||
super().__init__(
|
super().__init__(
|
||||||
hass,
|
hass,
|
||||||
_LOGGER,
|
_LOGGER,
|
||||||
config_entry=config_entry,
|
config_entry=entry,
|
||||||
name=f"Tibber {tibber_connection.name}",
|
name="Tibber",
|
||||||
update_interval=timedelta(minutes=20),
|
|
||||||
)
|
)
|
||||||
self._tibber_connection = tibber_connection
|
self._listener_unsub: Callable[[], None] | None = None
|
||||||
|
|
||||||
async def _async_update_data(self) -> None:
|
def _get_next_15_interval(self) -> datetime:
|
||||||
"""Update data via API."""
|
"""Return the next 15-minute boundary (minutes 0, 15, 30, 45) in UTC."""
|
||||||
|
next_run = dt_util.utcnow() + timedelta(minutes=15)
|
||||||
|
next_minute = (next_run.minute // 15) * 15
|
||||||
|
return next_run.replace(
|
||||||
|
minute=next_minute, second=0, microsecond=0, tzinfo=dt_util.UTC
|
||||||
|
)
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def _on_scheduled_refresh(self, _fire_time: datetime) -> None:
|
||||||
|
"""Run the scheduled refresh (same contract as base refresh interval)."""
|
||||||
|
self.config_entry.async_create_background_task(
|
||||||
|
self.hass,
|
||||||
|
self._handle_refresh_interval(),
|
||||||
|
name=f"{self.name} - {self.config_entry.title} - refresh",
|
||||||
|
eager_start=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def _schedule_refresh(self) -> None:
|
||||||
|
"""Schedule a refresh at the next 15-minute boundary."""
|
||||||
|
if self.config_entry.pref_disable_polling:
|
||||||
|
return
|
||||||
|
self._async_unsub_refresh()
|
||||||
|
self._unsub_refresh = async_track_point_in_utc_time(
|
||||||
|
self.hass,
|
||||||
|
self._on_scheduled_refresh,
|
||||||
|
self._get_next_15_interval(),
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _async_update_data(self) -> dict[str, TibberHomeData]:
|
||||||
|
"""Update data via API and return per-home data for sensors."""
|
||||||
|
_LOGGER.error("Updating data")
|
||||||
|
tibber_connection = await self.config_entry.runtime_data.async_get_client(
|
||||||
|
self.hass
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
await self._tibber_connection.fetch_consumption_data_active_homes()
|
await tibber_connection.fetch_consumption_data_active_homes()
|
||||||
await self._tibber_connection.fetch_production_data_active_homes()
|
await tibber_connection.fetch_production_data_active_homes()
|
||||||
await self._insert_statistics()
|
now = dt_util.now()
|
||||||
|
for home in tibber_connection.get_homes(only_active=True):
|
||||||
|
update_needed = False
|
||||||
|
last_data_timestamp = home.last_data_timestamp
|
||||||
|
|
||||||
|
if last_data_timestamp is None:
|
||||||
|
update_needed = True
|
||||||
|
else:
|
||||||
|
remaining_seconds = (last_data_timestamp - now).total_seconds()
|
||||||
|
if remaining_seconds < 11 * 3600:
|
||||||
|
update_needed = True
|
||||||
|
|
||||||
|
if update_needed:
|
||||||
|
await home.update_info_and_price_info()
|
||||||
|
await self._insert_statistics(tibber_connection)
|
||||||
except tibber.RetryableHttpExceptionError as err:
|
except tibber.RetryableHttpExceptionError as err:
|
||||||
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
|
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
|
||||||
except tibber.FatalHttpExceptionError:
|
except tibber.FatalHttpExceptionError as err:
|
||||||
# Fatal error. Reload config entry to show correct error.
|
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
|
||||||
self.hass.async_create_task(
|
|
||||||
self.hass.config_entries.async_reload(self.config_entry.entry_id)
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _insert_statistics(self) -> None:
|
result: dict[str, TibberHomeData] = {}
|
||||||
|
for home in tibber_connection.get_homes(only_active=True):
|
||||||
|
result[home.home_id] = _build_home_data(home)
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def _insert_statistics(self, tibber_connection: tibber.Tibber) -> None:
|
||||||
"""Insert Tibber statistics."""
|
"""Insert Tibber statistics."""
|
||||||
for home in self._tibber_connection.get_homes():
|
for home in tibber_connection.get_homes():
|
||||||
sensors: list[tuple[str, bool, str | None, str]] = []
|
sensors: list[tuple[str, bool, str | None, str]] = []
|
||||||
if home.hourly_consumption_data:
|
if home.hourly_consumption_data:
|
||||||
sensors.append(
|
sensors.append(
|
||||||
@@ -212,7 +314,6 @@ class TibberDataAPICoordinator(DataUpdateCoordinator[dict[str, TibberDevice]]):
|
|||||||
update_interval=timedelta(minutes=1),
|
update_interval=timedelta(minutes=1),
|
||||||
config_entry=entry,
|
config_entry=entry,
|
||||||
)
|
)
|
||||||
self._runtime_data = entry.runtime_data
|
|
||||||
self.sensors_by_device: dict[str, dict[str, tibber.data_api.Sensor]] = {}
|
self.sensors_by_device: dict[str, dict[str, tibber.data_api.Sensor]] = {}
|
||||||
|
|
||||||
def _build_sensor_lookup(self, devices: dict[str, TibberDevice]) -> None:
|
def _build_sensor_lookup(self, devices: dict[str, TibberDevice]) -> None:
|
||||||
@@ -233,7 +334,7 @@ class TibberDataAPICoordinator(DataUpdateCoordinator[dict[str, TibberDevice]]):
|
|||||||
async def _async_get_client(self) -> tibber.Tibber:
|
async def _async_get_client(self) -> tibber.Tibber:
|
||||||
"""Get the Tibber client with error handling."""
|
"""Get the Tibber client with error handling."""
|
||||||
try:
|
try:
|
||||||
return await self._runtime_data.async_get_client(self.hass)
|
return await self.config_entry.runtime_data.async_get_client(self.hass)
|
||||||
except ConfigEntryAuthFailed:
|
except ConfigEntryAuthFailed:
|
||||||
raise
|
raise
|
||||||
except (ClientError, TimeoutError, tibber.UserAgentMissingError) as err:
|
except (ClientError, TimeoutError, tibber.UserAgentMissingError) as err:
|
||||||
@@ -257,3 +358,48 @@ class TibberDataAPICoordinator(DataUpdateCoordinator[dict[str, TibberDevice]]):
|
|||||||
) from err
|
) from err
|
||||||
self._build_sensor_lookup(devices)
|
self._build_sensor_lookup(devices)
|
||||||
return devices
|
return devices
|
||||||
|
|
||||||
|
|
||||||
|
class TibberRtDataCoordinator(DataUpdateCoordinator[dict[str, Any]]):
|
||||||
|
"""Handle Tibber realtime data."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
hass: HomeAssistant,
|
||||||
|
config_entry: ConfigEntry,
|
||||||
|
add_sensor_callback: Callable[[TibberRtDataCoordinator, Any], None],
|
||||||
|
tibber_home: TibberHome,
|
||||||
|
) -> None:
|
||||||
|
"""Initialize the data handler."""
|
||||||
|
self._add_sensor_callback = add_sensor_callback
|
||||||
|
super().__init__(
|
||||||
|
hass,
|
||||||
|
_LOGGER,
|
||||||
|
config_entry=config_entry,
|
||||||
|
name=tibber_home.info["viewer"]["home"]["address"].get(
|
||||||
|
"address1", "Tibber"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
self._async_remove_device_updates_handler = self.async_add_listener(
|
||||||
|
self._data_updated
|
||||||
|
)
|
||||||
|
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._handle_ha_stop)
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def _handle_ha_stop(self, _event: Event) -> None:
|
||||||
|
"""Handle Home Assistant stopping."""
|
||||||
|
self._async_remove_device_updates_handler()
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def _data_updated(self) -> None:
|
||||||
|
"""Triggered when data is updated."""
|
||||||
|
if live_measurement := self.get_live_measurement():
|
||||||
|
self._add_sensor_callback(self, live_measurement)
|
||||||
|
|
||||||
|
def get_live_measurement(self) -> Any:
|
||||||
|
"""Get live measurement data."""
|
||||||
|
if errors := self.data.get("errors"):
|
||||||
|
_LOGGER.error(errors[0])
|
||||||
|
return None
|
||||||
|
return self.data.get("data", {}).get("liveMeasurement")
|
||||||
|
|||||||
117
homeassistant/components/tibber/entity.py
Normal file
117
homeassistant/components/tibber/entity.py
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
"""Shared entity base for Tibber sensors."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from typing import TYPE_CHECKING, cast
|
||||||
|
|
||||||
|
from homeassistant.components.sensor import SensorEntityDescription
|
||||||
|
from homeassistant.core import callback
|
||||||
|
from homeassistant.helpers.device_registry import DeviceInfo
|
||||||
|
from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
||||||
|
from homeassistant.util import dt as dt_util
|
||||||
|
|
||||||
|
from .const import DOMAIN
|
||||||
|
from .coordinator import TibberDataCoordinator, TibberHomeData, TibberRtDataCoordinator
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from tibber import TibberHome
|
||||||
|
|
||||||
|
|
||||||
|
class TibberCoordinatorEntity(CoordinatorEntity[TibberDataCoordinator]):
|
||||||
|
"""Base entity for Tibber sensors using TibberDataCoordinator."""
|
||||||
|
|
||||||
|
_attr_has_entity_name = True
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
coordinator: TibberDataCoordinator,
|
||||||
|
tibber_home: TibberHome,
|
||||||
|
) -> None:
|
||||||
|
"""Initialize the entity."""
|
||||||
|
super().__init__(coordinator)
|
||||||
|
self._tibber_home = tibber_home
|
||||||
|
self._home_name: str = tibber_home.name or tibber_home.home_id
|
||||||
|
self._device_name: str = self._home_name
|
||||||
|
self._attr_device_info = DeviceInfo(
|
||||||
|
identifiers={(DOMAIN, self._tibber_home.home_id)},
|
||||||
|
name=self._device_name,
|
||||||
|
model="Tibber Pulse",
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_home_data(self) -> TibberHomeData | None:
|
||||||
|
"""Return cached home data from the coordinator."""
|
||||||
|
data = cast(dict[str, TibberHomeData] | None, self.coordinator.data)
|
||||||
|
if data is None:
|
||||||
|
return None
|
||||||
|
return data.get(self._tibber_home.home_id)
|
||||||
|
|
||||||
|
|
||||||
|
class TibberRTCoordinatorEntity(CoordinatorEntity[TibberRtDataCoordinator]):
|
||||||
|
"""Representation of a Tibber sensor for real time consumption."""
|
||||||
|
|
||||||
|
_attr_has_entity_name = True
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
tibber_home: TibberHome,
|
||||||
|
description: SensorEntityDescription,
|
||||||
|
initial_state: float,
|
||||||
|
coordinator: TibberRtDataCoordinator,
|
||||||
|
) -> None:
|
||||||
|
"""Initialize the sensor."""
|
||||||
|
super().__init__(coordinator)
|
||||||
|
self._tibber_home = tibber_home
|
||||||
|
self._home_name: str = tibber_home.name or tibber_home.home_id
|
||||||
|
model: str = "Tibber Pulse"
|
||||||
|
self._device_name: str = f"{model} {self._home_name}"
|
||||||
|
self.entity_description = description
|
||||||
|
|
||||||
|
self._attr_device_info = DeviceInfo(
|
||||||
|
identifiers={(DOMAIN, self._tibber_home.home_id)},
|
||||||
|
name=self._device_name,
|
||||||
|
model=model,
|
||||||
|
)
|
||||||
|
self._attr_native_value = initial_state
|
||||||
|
self._attr_last_reset: datetime | None = None
|
||||||
|
self._attr_unique_id = f"{self._tibber_home.home_id}_rt_{description.key}"
|
||||||
|
|
||||||
|
if description.key in ("accumulatedCost", "accumulatedReward"):
|
||||||
|
self._attr_native_unit_of_measurement = tibber_home.currency
|
||||||
|
|
||||||
|
@property
|
||||||
|
def available(self) -> bool:
|
||||||
|
"""Return True if entity is available."""
|
||||||
|
return self._tibber_home.rt_subscription_running
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def _handle_coordinator_update(self) -> None:
|
||||||
|
if not (live_measurement := self.coordinator.get_live_measurement()):
|
||||||
|
return
|
||||||
|
state = live_measurement.get(self.entity_description.key)
|
||||||
|
if state is None:
|
||||||
|
return
|
||||||
|
if self.entity_description.key in (
|
||||||
|
"accumulatedConsumption",
|
||||||
|
"accumulatedProduction",
|
||||||
|
):
|
||||||
|
# Value is reset to 0 at midnight, but not always strictly increasing
|
||||||
|
# due to hourly corrections.
|
||||||
|
# If device is offline, last_reset should be updated when it comes
|
||||||
|
# back online if the value has decreased
|
||||||
|
ts_local = dt_util.parse_datetime(live_measurement["timestamp"])
|
||||||
|
if ts_local is not None:
|
||||||
|
if self._attr_last_reset is None or (
|
||||||
|
state < 0.5 * self._attr_native_value
|
||||||
|
and (
|
||||||
|
ts_local.hour == 0
|
||||||
|
or (ts_local - self._attr_last_reset) > timedelta(hours=24)
|
||||||
|
)
|
||||||
|
):
|
||||||
|
self._attr_last_reset = dt_util.as_utc(
|
||||||
|
ts_local.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||||
|
)
|
||||||
|
if self.entity_description.key == "powerFactor":
|
||||||
|
state *= 100.0
|
||||||
|
self._attr_native_value = state
|
||||||
|
self.async_write_ha_state()
|
||||||
@@ -2,12 +2,8 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from collections.abc import Callable
|
|
||||||
import datetime
|
|
||||||
from datetime import timedelta
|
|
||||||
import logging
|
import logging
|
||||||
from random import randrange
|
from typing import Any, cast
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
from tibber import FatalHttpExceptionError, RetryableHttpExceptionError, TibberHome
|
from tibber import FatalHttpExceptionError, RetryableHttpExceptionError, TibberHome
|
||||||
@@ -19,9 +15,7 @@ from homeassistant.components.sensor import (
|
|||||||
SensorEntityDescription,
|
SensorEntityDescription,
|
||||||
SensorStateClass,
|
SensorStateClass,
|
||||||
)
|
)
|
||||||
from homeassistant.config_entries import ConfigEntry
|
|
||||||
from homeassistant.const import (
|
from homeassistant.const import (
|
||||||
EVENT_HOMEASSISTANT_STOP,
|
|
||||||
PERCENTAGE,
|
PERCENTAGE,
|
||||||
SIGNAL_STRENGTH_DECIBELS,
|
SIGNAL_STRENGTH_DECIBELS,
|
||||||
EntityCategory,
|
EntityCategory,
|
||||||
@@ -32,28 +26,25 @@ from homeassistant.const import (
|
|||||||
UnitOfPower,
|
UnitOfPower,
|
||||||
UnitOfTemperature,
|
UnitOfTemperature,
|
||||||
)
|
)
|
||||||
from homeassistant.core import Event, HomeAssistant, callback
|
from homeassistant.core import HomeAssistant, callback
|
||||||
from homeassistant.exceptions import PlatformNotReady
|
from homeassistant.exceptions import PlatformNotReady
|
||||||
from homeassistant.helpers import entity_registry as er
|
from homeassistant.helpers import entity_registry as er
|
||||||
from homeassistant.helpers.device_registry import DeviceInfo
|
from homeassistant.helpers.device_registry import DeviceInfo
|
||||||
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
|
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
|
||||||
from homeassistant.helpers.typing import StateType
|
from homeassistant.helpers.typing import StateType
|
||||||
from homeassistant.helpers.update_coordinator import (
|
from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
||||||
CoordinatorEntity,
|
|
||||||
DataUpdateCoordinator,
|
|
||||||
)
|
|
||||||
from homeassistant.util import Throttle, dt as dt_util
|
|
||||||
|
|
||||||
from .const import DOMAIN, MANUFACTURER, TibberConfigEntry
|
from .const import DOMAIN, TibberConfigEntry
|
||||||
from .coordinator import TibberDataAPICoordinator, TibberDataCoordinator
|
from .coordinator import (
|
||||||
|
TibberDataAPICoordinator,
|
||||||
|
TibberDataCoordinator,
|
||||||
|
TibberRtDataCoordinator,
|
||||||
|
)
|
||||||
|
from .entity import TibberCoordinatorEntity, TibberRTCoordinatorEntity
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
ICON = "mdi:currency-usd"
|
|
||||||
SCAN_INTERVAL = timedelta(minutes=1)
|
|
||||||
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
|
|
||||||
PARALLEL_UPDATES = 0
|
PARALLEL_UPDATES = 0
|
||||||
TWENTY_MINUTES = 20 * 60
|
|
||||||
|
|
||||||
RT_SENSORS_UNIQUE_ID_MIGRATION = {
|
RT_SENSORS_UNIQUE_ID_MIGRATION = {
|
||||||
"accumulated_consumption_last_hour": "accumulated consumption current hour",
|
"accumulated_consumption_last_hour": "accumulated consumption current hour",
|
||||||
@@ -260,9 +251,56 @@ SENSORS: tuple[SensorEntityDescription, ...] = (
|
|||||||
native_unit_of_measurement=UnitOfEnergy.KILO_WATT_HOUR,
|
native_unit_of_measurement=UnitOfEnergy.KILO_WATT_HOUR,
|
||||||
state_class=SensorStateClass.TOTAL_INCREASING,
|
state_class=SensorStateClass.TOTAL_INCREASING,
|
||||||
),
|
),
|
||||||
|
SensorEntityDescription(
|
||||||
|
key="current_price",
|
||||||
|
translation_key="electricity_price",
|
||||||
|
state_class=SensorStateClass.MEASUREMENT,
|
||||||
|
suggested_display_precision=3,
|
||||||
|
),
|
||||||
|
SensorEntityDescription(
|
||||||
|
key="max_price",
|
||||||
|
translation_key="max_price",
|
||||||
|
state_class=SensorStateClass.MEASUREMENT,
|
||||||
|
suggested_display_precision=3,
|
||||||
|
),
|
||||||
|
SensorEntityDescription(
|
||||||
|
key="avg_price",
|
||||||
|
translation_key="avg_price",
|
||||||
|
state_class=SensorStateClass.MEASUREMENT,
|
||||||
|
suggested_display_precision=3,
|
||||||
|
),
|
||||||
|
SensorEntityDescription(
|
||||||
|
key="min_price",
|
||||||
|
translation_key="min_price",
|
||||||
|
state_class=SensorStateClass.MEASUREMENT,
|
||||||
|
suggested_display_precision=3,
|
||||||
|
),
|
||||||
|
SensorEntityDescription(
|
||||||
|
key="off_peak_1",
|
||||||
|
translation_key="off_peak_1",
|
||||||
|
state_class=SensorStateClass.MEASUREMENT,
|
||||||
|
suggested_display_precision=3,
|
||||||
|
),
|
||||||
|
SensorEntityDescription(
|
||||||
|
key="peak",
|
||||||
|
translation_key="peak",
|
||||||
|
state_class=SensorStateClass.MEASUREMENT,
|
||||||
|
suggested_display_precision=3,
|
||||||
|
),
|
||||||
|
SensorEntityDescription(
|
||||||
|
key="off_peak_2",
|
||||||
|
translation_key="off_peak_2",
|
||||||
|
state_class=SensorStateClass.MEASUREMENT,
|
||||||
|
suggested_display_precision=3,
|
||||||
|
),
|
||||||
|
SensorEntityDescription(
|
||||||
|
key="intraday_price_ranking",
|
||||||
|
translation_key="intraday_price_ranking",
|
||||||
|
state_class=SensorStateClass.MEASUREMENT,
|
||||||
|
suggested_display_precision=2,
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
DATA_API_SENSORS: tuple[SensorEntityDescription, ...] = (
|
DATA_API_SENSORS: tuple[SensorEntityDescription, ...] = (
|
||||||
SensorEntityDescription(
|
SensorEntityDescription(
|
||||||
key="cellular.rssi",
|
key="cellular.rssi",
|
||||||
@@ -609,7 +647,7 @@ async def _async_setup_graphql_sensors(
|
|||||||
|
|
||||||
entity_registry = er.async_get(hass)
|
entity_registry = er.async_get(hass)
|
||||||
|
|
||||||
coordinator: TibberDataCoordinator | None = None
|
coordinator: TibberDataCoordinator = entry.runtime_data.data_coordinator
|
||||||
entities: list[TibberSensor] = []
|
entities: list[TibberSensor] = []
|
||||||
for home in tibber_connection.get_homes(only_active=False):
|
for home in tibber_connection.get_homes(only_active=False):
|
||||||
try:
|
try:
|
||||||
@@ -626,13 +664,7 @@ async def _async_setup_graphql_sensors(
|
|||||||
raise PlatformNotReady from err
|
raise PlatformNotReady from err
|
||||||
|
|
||||||
if home.has_active_subscription:
|
if home.has_active_subscription:
|
||||||
entities.append(TibberSensorElPrice(home))
|
entities.extend(TibberSensor(home, coordinator, desc) for desc in SENSORS)
|
||||||
if coordinator is None:
|
|
||||||
coordinator = TibberDataCoordinator(hass, entry, tibber_connection)
|
|
||||||
entities.extend(
|
|
||||||
TibberDataSensor(home, coordinator, entity_description)
|
|
||||||
for entity_description in SENSORS
|
|
||||||
)
|
|
||||||
|
|
||||||
if home.has_real_time_consumption:
|
if home.has_real_time_consumption:
|
||||||
entity_creator = TibberRtEntityCreator(
|
entity_creator = TibberRtEntityCreator(
|
||||||
@@ -657,8 +689,6 @@ def _setup_data_api_sensors(
|
|||||||
"""Set up sensors backed by the Tibber Data API."""
|
"""Set up sensors backed by the Tibber Data API."""
|
||||||
|
|
||||||
coordinator = entry.runtime_data.data_api_coordinator
|
coordinator = entry.runtime_data.data_api_coordinator
|
||||||
if coordinator is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
entities: list[TibberDataAPISensor] = []
|
entities: list[TibberDataAPISensor] = []
|
||||||
api_sensors = {sensor.key: sensor for sensor in DATA_API_SENSORS}
|
api_sensors = {sensor.key: sensor for sensor in DATA_API_SENSORS}
|
||||||
@@ -707,116 +737,8 @@ class TibberDataAPISensor(CoordinatorEntity[TibberDataAPICoordinator], SensorEnt
|
|||||||
return sensor.value if sensor else None
|
return sensor.value if sensor else None
|
||||||
|
|
||||||
|
|
||||||
class TibberSensor(SensorEntity):
|
class TibberSensor(TibberCoordinatorEntity, SensorEntity):
|
||||||
"""Representation of a generic Tibber sensor."""
|
"""Representation of a Tibber sensor reading from coordinator data."""
|
||||||
|
|
||||||
_attr_has_entity_name = True
|
|
||||||
|
|
||||||
def __init__(self, *args: Any, tibber_home: TibberHome, **kwargs: Any) -> None:
|
|
||||||
"""Initialize the sensor."""
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self._tibber_home = tibber_home
|
|
||||||
self._home_name = tibber_home.info["viewer"]["home"]["appNickname"]
|
|
||||||
if self._home_name is None:
|
|
||||||
self._home_name = tibber_home.info["viewer"]["home"]["address"].get(
|
|
||||||
"address1", ""
|
|
||||||
)
|
|
||||||
self._device_name: str | None = None
|
|
||||||
self._model: str | None = None
|
|
||||||
|
|
||||||
@property
|
|
||||||
def device_info(self) -> DeviceInfo:
|
|
||||||
"""Return the device_info of the device."""
|
|
||||||
device_info = DeviceInfo(
|
|
||||||
identifiers={(DOMAIN, self._tibber_home.home_id)},
|
|
||||||
name=self._device_name,
|
|
||||||
manufacturer=MANUFACTURER,
|
|
||||||
)
|
|
||||||
if self._model is not None:
|
|
||||||
device_info["model"] = self._model
|
|
||||||
return device_info
|
|
||||||
|
|
||||||
|
|
||||||
class TibberSensorElPrice(TibberSensor):
|
|
||||||
"""Representation of a Tibber sensor for el price."""
|
|
||||||
|
|
||||||
_attr_state_class = SensorStateClass.MEASUREMENT
|
|
||||||
_attr_translation_key = "electricity_price"
|
|
||||||
|
|
||||||
def __init__(self, tibber_home: TibberHome) -> None:
|
|
||||||
"""Initialize the sensor."""
|
|
||||||
super().__init__(tibber_home=tibber_home)
|
|
||||||
self._last_updated: datetime.datetime | None = None
|
|
||||||
self._spread_load_constant = randrange(TWENTY_MINUTES)
|
|
||||||
|
|
||||||
self._attr_available = False
|
|
||||||
self._attr_extra_state_attributes = {
|
|
||||||
"app_nickname": None,
|
|
||||||
"grid_company": None,
|
|
||||||
"estimated_annual_consumption": None,
|
|
||||||
"max_price": None,
|
|
||||||
"avg_price": None,
|
|
||||||
"min_price": None,
|
|
||||||
"off_peak_1": None,
|
|
||||||
"peak": None,
|
|
||||||
"off_peak_2": None,
|
|
||||||
"intraday_price_ranking": None,
|
|
||||||
}
|
|
||||||
self._attr_icon = ICON
|
|
||||||
self._attr_unique_id = self._tibber_home.home_id
|
|
||||||
self._model = "Price Sensor"
|
|
||||||
|
|
||||||
self._device_name = self._home_name
|
|
||||||
|
|
||||||
async def async_update(self) -> None:
|
|
||||||
"""Get the latest data and updates the states."""
|
|
||||||
now = dt_util.now()
|
|
||||||
if (
|
|
||||||
not self._tibber_home.last_data_timestamp
|
|
||||||
or (self._tibber_home.last_data_timestamp - now).total_seconds()
|
|
||||||
< 10 * 3600 - self._spread_load_constant
|
|
||||||
or not self.available
|
|
||||||
):
|
|
||||||
_LOGGER.debug("Asking for new data")
|
|
||||||
await self._fetch_data()
|
|
||||||
|
|
||||||
elif (
|
|
||||||
self._tibber_home.price_total
|
|
||||||
and self._last_updated
|
|
||||||
and self._last_updated.hour == now.hour
|
|
||||||
and now - self._last_updated < timedelta(minutes=15)
|
|
||||||
and self._tibber_home.last_data_timestamp
|
|
||||||
):
|
|
||||||
return
|
|
||||||
|
|
||||||
res = self._tibber_home.current_price_data()
|
|
||||||
self._attr_native_value, self._last_updated, price_rank = res
|
|
||||||
self._attr_extra_state_attributes["intraday_price_ranking"] = price_rank
|
|
||||||
|
|
||||||
attrs = self._tibber_home.current_attributes()
|
|
||||||
self._attr_extra_state_attributes.update(attrs)
|
|
||||||
self._attr_available = self._attr_native_value is not None
|
|
||||||
self._attr_native_unit_of_measurement = self._tibber_home.price_unit
|
|
||||||
|
|
||||||
@Throttle(MIN_TIME_BETWEEN_UPDATES)
|
|
||||||
async def _fetch_data(self) -> None:
|
|
||||||
_LOGGER.debug("Fetching data")
|
|
||||||
try:
|
|
||||||
await self._tibber_home.update_info_and_price_info()
|
|
||||||
except TimeoutError, aiohttp.ClientError:
|
|
||||||
return
|
|
||||||
data = self._tibber_home.info["viewer"]["home"]
|
|
||||||
self._attr_extra_state_attributes["app_nickname"] = data["appNickname"]
|
|
||||||
self._attr_extra_state_attributes["grid_company"] = data["meteringPointData"][
|
|
||||||
"gridCompany"
|
|
||||||
]
|
|
||||||
self._attr_extra_state_attributes["estimated_annual_consumption"] = data[
|
|
||||||
"meteringPointData"
|
|
||||||
]["estimatedAnnualConsumption"]
|
|
||||||
|
|
||||||
|
|
||||||
class TibberDataSensor(TibberSensor, CoordinatorEntity[TibberDataCoordinator]):
|
|
||||||
"""Representation of a Tibber sensor."""
|
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@@ -827,80 +749,55 @@ class TibberDataSensor(TibberSensor, CoordinatorEntity[TibberDataCoordinator]):
|
|||||||
"""Initialize the sensor."""
|
"""Initialize the sensor."""
|
||||||
super().__init__(coordinator=coordinator, tibber_home=tibber_home)
|
super().__init__(coordinator=coordinator, tibber_home=tibber_home)
|
||||||
self.entity_description = entity_description
|
self.entity_description = entity_description
|
||||||
|
if self.entity_description.key == "current_price":
|
||||||
self._attr_unique_id = (
|
# Preserve the existing unique ID for the electricity price
|
||||||
f"{self._tibber_home.home_id}_{self.entity_description.key}"
|
# entity to avoid breaking user setups.
|
||||||
)
|
self._attr_unique_id = self._tibber_home.home_id
|
||||||
if entity_description.key == "month_cost":
|
else:
|
||||||
self._attr_native_unit_of_measurement = self._tibber_home.currency
|
self._attr_unique_id = (
|
||||||
|
f"{self._tibber_home.home_id}_{self.entity_description.key}"
|
||||||
|
)
|
||||||
self._device_name = self._home_name
|
self._device_name = self._home_name
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def native_value(self) -> StateType:
|
def available(self) -> bool:
|
||||||
"""Return the value of the sensor."""
|
"""Return whether the sensor is available."""
|
||||||
return getattr(self._tibber_home, self.entity_description.key) # type: ignore[no-any-return]
|
return super().available and self._get_home_data() is not None
|
||||||
|
|
||||||
|
|
||||||
class TibberSensorRT(TibberSensor, CoordinatorEntity["TibberRtDataCoordinator"]):
|
|
||||||
"""Representation of a Tibber sensor for real time consumption."""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
tibber_home: TibberHome,
|
|
||||||
description: SensorEntityDescription,
|
|
||||||
initial_state: float,
|
|
||||||
coordinator: TibberRtDataCoordinator,
|
|
||||||
) -> None:
|
|
||||||
"""Initialize the sensor."""
|
|
||||||
super().__init__(coordinator=coordinator, tibber_home=tibber_home)
|
|
||||||
self.entity_description = description
|
|
||||||
self._model = "Tibber Pulse"
|
|
||||||
self._device_name = f"{self._model} {self._home_name}"
|
|
||||||
|
|
||||||
self._attr_native_value = initial_state
|
|
||||||
self._attr_unique_id = f"{self._tibber_home.home_id}_rt_{description.key}"
|
|
||||||
|
|
||||||
if description.key in ("accumulatedCost", "accumulatedReward"):
|
|
||||||
self._attr_native_unit_of_measurement = tibber_home.currency
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def available(self) -> bool:
|
def native_value(self) -> StateType:
|
||||||
"""Return True if entity is available."""
|
"""Return the value of the sensor from coordinator data."""
|
||||||
return self._tibber_home.rt_subscription_running
|
_LOGGER.error("native_value: %s", self.entity_description.key)
|
||||||
|
home_data = self._get_home_data()
|
||||||
|
if home_data is None:
|
||||||
|
return None
|
||||||
|
return cast(StateType, home_data[self.entity_description.key])
|
||||||
|
|
||||||
@callback
|
@property
|
||||||
def _handle_coordinator_update(self) -> None:
|
def native_unit_of_measurement(self) -> str | None:
|
||||||
if not (live_measurement := self.coordinator.get_live_measurement()):
|
"""Return the unit from coordinator data for monetary sensors."""
|
||||||
return
|
if (
|
||||||
state = live_measurement.get(self.entity_description.key)
|
self.entity_description.device_class == SensorDeviceClass.MONETARY
|
||||||
if state is None:
|
or self.entity_description.key
|
||||||
return
|
in (
|
||||||
if self.entity_description.key in (
|
"current_price",
|
||||||
"accumulatedConsumption",
|
"max_price",
|
||||||
"accumulatedProduction",
|
"avg_price",
|
||||||
|
"min_price",
|
||||||
|
"off_peak_1",
|
||||||
|
"peak",
|
||||||
|
"off_peak_2",
|
||||||
|
)
|
||||||
):
|
):
|
||||||
# Value is reset to 0 at midnight, but not always strictly increasing
|
home_data = self._get_home_data()
|
||||||
# due to hourly corrections.
|
if home_data is None:
|
||||||
# If device is offline, last_reset should be updated when it comes
|
return None
|
||||||
# back online if the value has decreased
|
return (
|
||||||
ts_local = dt_util.parse_datetime(live_measurement["timestamp"])
|
home_data.currency
|
||||||
if ts_local is not None:
|
if self.entity_description.device_class == SensorDeviceClass.MONETARY
|
||||||
if self.last_reset is None or (
|
else home_data.price_unit
|
||||||
# native_value is float
|
)
|
||||||
state < 0.5 * self.native_value # type: ignore[operator]
|
return self.entity_description.native_unit_of_measurement
|
||||||
and (
|
|
||||||
ts_local.hour == 0
|
|
||||||
or (ts_local - self.last_reset) > timedelta(hours=24)
|
|
||||||
)
|
|
||||||
):
|
|
||||||
self._attr_last_reset = dt_util.as_utc(
|
|
||||||
ts_local.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
||||||
)
|
|
||||||
if self.entity_description.key == "powerFactor":
|
|
||||||
state *= 100.0
|
|
||||||
self._attr_native_value = state
|
|
||||||
self.async_write_ha_state()
|
|
||||||
|
|
||||||
|
|
||||||
class TibberRtEntityCreator:
|
class TibberRtEntityCreator:
|
||||||
@@ -975,7 +872,7 @@ class TibberRtEntityCreator:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
self._migrate_unique_id(sensor_description)
|
self._migrate_unique_id(sensor_description)
|
||||||
entity = TibberSensorRT(
|
entity = TibberRTCoordinatorEntity(
|
||||||
self._tibber_home,
|
self._tibber_home,
|
||||||
sensor_description,
|
sensor_description,
|
||||||
state,
|
state,
|
||||||
@@ -985,48 +882,3 @@ class TibberRtEntityCreator:
|
|||||||
self._added_sensors.add(sensor_description.key)
|
self._added_sensors.add(sensor_description.key)
|
||||||
if new_entities:
|
if new_entities:
|
||||||
self._async_add_entities(new_entities)
|
self._async_add_entities(new_entities)
|
||||||
|
|
||||||
|
|
||||||
class TibberRtDataCoordinator(DataUpdateCoordinator): # pylint: disable=hass-enforce-class-module
|
|
||||||
"""Handle Tibber realtime data."""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
hass: HomeAssistant,
|
|
||||||
config_entry: ConfigEntry,
|
|
||||||
add_sensor_callback: Callable[[TibberRtDataCoordinator, Any], None],
|
|
||||||
tibber_home: TibberHome,
|
|
||||||
) -> None:
|
|
||||||
"""Initialize the data handler."""
|
|
||||||
self._add_sensor_callback = add_sensor_callback
|
|
||||||
super().__init__(
|
|
||||||
hass,
|
|
||||||
_LOGGER,
|
|
||||||
config_entry=config_entry,
|
|
||||||
name=tibber_home.info["viewer"]["home"]["address"].get(
|
|
||||||
"address1", "Tibber"
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
self._async_remove_device_updates_handler = self.async_add_listener(
|
|
||||||
self._data_updated
|
|
||||||
)
|
|
||||||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._handle_ha_stop)
|
|
||||||
|
|
||||||
@callback
|
|
||||||
def _handle_ha_stop(self, _event: Event) -> None:
|
|
||||||
"""Handle Home Assistant stopping."""
|
|
||||||
self._async_remove_device_updates_handler()
|
|
||||||
|
|
||||||
@callback
|
|
||||||
def _data_updated(self) -> None:
|
|
||||||
"""Triggered when data is updated."""
|
|
||||||
if live_measurement := self.get_live_measurement():
|
|
||||||
self._add_sensor_callback(self, live_measurement)
|
|
||||||
|
|
||||||
def get_live_measurement(self) -> Any:
|
|
||||||
"""Get live measurement data."""
|
|
||||||
if errors := self.data.get("errors"):
|
|
||||||
_LOGGER.error(errors[0])
|
|
||||||
return None
|
|
||||||
return self.data.get("data", {}).get("liveMeasurement")
|
|
||||||
|
|||||||
@@ -43,6 +43,9 @@
|
|||||||
"average_power": {
|
"average_power": {
|
||||||
"name": "Average power"
|
"name": "Average power"
|
||||||
},
|
},
|
||||||
|
"avg_price": {
|
||||||
|
"name": "Average price today"
|
||||||
|
},
|
||||||
"cellular_rssi": {
|
"cellular_rssi": {
|
||||||
"name": "Cellular signal strength"
|
"name": "Cellular signal strength"
|
||||||
},
|
},
|
||||||
@@ -136,6 +139,9 @@
|
|||||||
"grid_phase_count": {
|
"grid_phase_count": {
|
||||||
"name": "Number of grid phases"
|
"name": "Number of grid phases"
|
||||||
},
|
},
|
||||||
|
"intraday_price_ranking": {
|
||||||
|
"name": "Intraday price ranking"
|
||||||
|
},
|
||||||
"last_meter_consumption": {
|
"last_meter_consumption": {
|
||||||
"name": "Last meter consumption"
|
"name": "Last meter consumption"
|
||||||
},
|
},
|
||||||
@@ -145,15 +151,30 @@
|
|||||||
"max_power": {
|
"max_power": {
|
||||||
"name": "Max power"
|
"name": "Max power"
|
||||||
},
|
},
|
||||||
|
"max_price": {
|
||||||
|
"name": "Max price today"
|
||||||
|
},
|
||||||
"min_power": {
|
"min_power": {
|
||||||
"name": "Min power"
|
"name": "Min power"
|
||||||
},
|
},
|
||||||
|
"min_price": {
|
||||||
|
"name": "Min price today"
|
||||||
|
},
|
||||||
"month_cons": {
|
"month_cons": {
|
||||||
"name": "Monthly net consumption"
|
"name": "Monthly net consumption"
|
||||||
},
|
},
|
||||||
"month_cost": {
|
"month_cost": {
|
||||||
"name": "Monthly cost"
|
"name": "Monthly cost"
|
||||||
},
|
},
|
||||||
|
"off_peak_1": {
|
||||||
|
"name": "Off-peak 1 average"
|
||||||
|
},
|
||||||
|
"off_peak_2": {
|
||||||
|
"name": "Off-peak 2 average"
|
||||||
|
},
|
||||||
|
"peak": {
|
||||||
|
"name": "Peak average"
|
||||||
|
},
|
||||||
"peak_hour": {
|
"peak_hour": {
|
||||||
"name": "Monthly peak hour consumption"
|
"name": "Monthly peak hour consumption"
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
"""Test common."""
|
"""Test common."""
|
||||||
|
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
from unittest.mock import AsyncMock
|
from unittest.mock import AsyncMock, MagicMock
|
||||||
|
|
||||||
CONSUMPTION_DATA_1 = [
|
CONSUMPTION_DATA_1 = [
|
||||||
{
|
{
|
||||||
@@ -49,8 +49,24 @@ def mock_get_homes(only_active=True):
|
|||||||
tibber_home.has_active_subscription = True
|
tibber_home.has_active_subscription = True
|
||||||
tibber_home.has_real_time_consumption = False
|
tibber_home.has_real_time_consumption = False
|
||||||
tibber_home.country = "NO"
|
tibber_home.country = "NO"
|
||||||
tibber_home.last_cons_data_timestamp = dt.datetime(2016, 1, 1, 12, 44, 57)
|
tibber_home.last_cons_data_timestamp = dt.datetime(
|
||||||
tibber_home.last_data_timestamp = dt.datetime(2016, 1, 1, 12, 48, 57)
|
2016, 1, 1, 12, 44, 57, tzinfo=dt.UTC
|
||||||
|
)
|
||||||
|
tibber_home.last_data_timestamp = dt.datetime(2016, 1, 1, 12, 48, 57, tzinfo=dt.UTC)
|
||||||
|
tibber_home.price_unit = "NOK/kWh"
|
||||||
|
tibber_home.current_price_data = MagicMock(
|
||||||
|
return_value=(0.0, None, None),
|
||||||
|
)
|
||||||
|
tibber_home.current_attributes = MagicMock(
|
||||||
|
return_value={
|
||||||
|
"max_price": 0.0,
|
||||||
|
"avg_price": 0.0,
|
||||||
|
"min_price": 0.0,
|
||||||
|
"off_peak_1": 0.0,
|
||||||
|
"peak": 0.0,
|
||||||
|
"off_peak_2": 0.0,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
def get_historic_data(n_data, resolution="HOURLY", production=False):
|
def get_historic_data(n_data, resolution="HOURLY", production=False):
|
||||||
return PRODUCTION_DATA_1 if production else CONSUMPTION_DATA_1
|
return PRODUCTION_DATA_1 if production else CONSUMPTION_DATA_1
|
||||||
|
|||||||
@@ -36,6 +36,8 @@ async def test_data_api_runtime_creates_client(hass: HomeAssistant) -> None:
|
|||||||
|
|
||||||
runtime = TibberRuntimeData(
|
runtime = TibberRuntimeData(
|
||||||
session=session,
|
session=session,
|
||||||
|
data_api_coordinator=MagicMock(),
|
||||||
|
data_coordinator=MagicMock(),
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch("homeassistant.components.tibber.tibber.Tibber") as mock_client_cls:
|
with patch("homeassistant.components.tibber.tibber.Tibber") as mock_client_cls:
|
||||||
@@ -72,6 +74,8 @@ async def test_data_api_runtime_missing_token_raises(hass: HomeAssistant) -> Non
|
|||||||
|
|
||||||
runtime = TibberRuntimeData(
|
runtime = TibberRuntimeData(
|
||||||
session=session,
|
session=session,
|
||||||
|
data_api_coordinator=MagicMock(),
|
||||||
|
data_coordinator=MagicMock(),
|
||||||
)
|
)
|
||||||
|
|
||||||
with pytest.raises(ConfigEntryAuthFailed):
|
with pytest.raises(ConfigEntryAuthFailed):
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
"""Test adding external statistics from Tibber."""
|
"""Test adding external statistics from Tibber."""
|
||||||
|
|
||||||
from unittest.mock import AsyncMock
|
from unittest.mock import AsyncMock, MagicMock
|
||||||
|
|
||||||
from homeassistant.components.recorder import Recorder
|
from homeassistant.components.recorder import Recorder
|
||||||
from homeassistant.components.recorder.statistics import statistics_during_period
|
from homeassistant.components.recorder.statistics import statistics_during_period
|
||||||
@@ -24,7 +24,11 @@ async def test_async_setup_entry(
|
|||||||
tibber_connection.fetch_production_data_active_homes.return_value = None
|
tibber_connection.fetch_production_data_active_homes.return_value = None
|
||||||
tibber_connection.get_homes = mock_get_homes
|
tibber_connection.get_homes = mock_get_homes
|
||||||
|
|
||||||
coordinator = TibberDataCoordinator(hass, config_entry, tibber_connection)
|
runtime_data = MagicMock()
|
||||||
|
runtime_data.async_get_client = AsyncMock(return_value=tibber_connection)
|
||||||
|
config_entry.runtime_data = runtime_data
|
||||||
|
|
||||||
|
coordinator = TibberDataCoordinator(hass, config_entry)
|
||||||
await coordinator._async_update_data()
|
await coordinator._async_update_data()
|
||||||
await async_wait_recording_done(hass)
|
await async_wait_recording_done(hass)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user