Compare commits

..

7 Commits

Author SHA1 Message Date
Erik
897e470ff8 Improve docstrings in climate trigger tests 2025-12-19 14:10:56 +01:00
Erik Montnemery
6cc7d83def Add trigger climate.hvac_mode_changed (#159358) 2025-12-19 12:57:01 +01:00
Joost Lekkerkerker
5154418051 Add integration_type device to incomfort (#159173) 2025-12-19 12:34:16 +01:00
Josef Zweck
7e63c12b95 Add entity picture to lamarzocco (#158518) 2025-12-19 11:59:51 +01:00
puddly
d17e951591 Bump ZHA to 0.0.81 (#159396) 2025-12-19 10:27:50 +01:00
dependabot[bot]
9198e5f56d Bump actions/attest-build-provenance from 3.0.0 to 3.1.0 (#159405)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-19 10:06:42 +01:00
Ludovic BOUÉ
97d7e0e01e Matter Speaker volume LevelControl (#149490)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-12-19 10:01:55 +01:00
38 changed files with 548 additions and 1271 deletions

View File

@@ -551,7 +551,7 @@ jobs:
- name: Generate artifact attestation
if: needs.init.outputs.channel != 'dev' && needs.init.outputs.publish == 'true'
uses: actions/attest-build-provenance@977bb373ede98d70efdf65b84cb5f73e068dcc2a # v3.0.0
uses: actions/attest-build-provenance@00014ed6ed5efc5b1ab7f7f34a39eb55d41aa4f8 # v3.1.0
with:
subject-name: ${{ env.HASSFEST_IMAGE_NAME }}
subject-digest: ${{ steps.push.outputs.digest }}

View File

@@ -45,7 +45,7 @@ def make_entity_state_trigger_required_features(
"""Trigger for entity state changes."""
_domain = domain
_to_state = to_state
_to_states = {to_state}
_required_features = required_features
return CustomTrigger

View File

@@ -47,7 +47,7 @@ def make_binary_sensor_trigger(
"""Trigger for entity state changes."""
_device_class = device_class
_to_state = to_state
_to_states = {to_state}
return CustomTrigger

View File

@@ -98,6 +98,9 @@
}
},
"triggers": {
"hvac_mode_changed": {
"trigger": "mdi:thermostat"
},
"started_cooling": {
"trigger": "mdi:snowflake"
},

View File

@@ -298,6 +298,20 @@
},
"title": "Climate",
"triggers": {
"hvac_mode_changed": {
"description": "Triggers after the mode of one or more climate-control devices changes.",
"fields": {
"behavior": {
"description": "[%key:component::climate::common::trigger_behavior_description%]",
"name": "[%key:component::climate::common::trigger_behavior_name%]"
},
"hvac_mode": {
"description": "The HVAC modes to trigger on.",
"name": "Modes"
}
},
"name": "Climate-control device mode changed"
},
"started_cooling": {
"description": "Triggers after one or more climate-control devices start cooling.",
"fields": {

View File

@@ -1,8 +1,15 @@
"""Provides triggers for climates."""
import voluptuous as vol
from homeassistant.const import CONF_OPTIONS
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.trigger import (
ENTITY_STATE_TRIGGER_SCHEMA_FIRST_LAST,
EntityTargetStateTriggerBase,
Trigger,
TriggerConfig,
make_entity_target_state_attribute_trigger,
make_entity_target_state_trigger,
make_entity_transition_trigger,
@@ -10,7 +17,33 @@ from homeassistant.helpers.trigger import (
from .const import ATTR_HVAC_ACTION, DOMAIN, HVACAction, HVACMode
CONF_HVAC_MODE = "hvac_mode"
HVAC_MODE_CHANGED_TRIGGER_SCHEMA = ENTITY_STATE_TRIGGER_SCHEMA_FIRST_LAST.extend(
{
vol.Required(CONF_OPTIONS): {
vol.Required(CONF_HVAC_MODE): vol.All(
cv.ensure_list, vol.Length(min=1), [HVACMode]
),
},
}
)
class HVACModeChangedTrigger(EntityTargetStateTriggerBase):
"""Trigger for entity state changes."""
_domain = DOMAIN
_schema = HVAC_MODE_CHANGED_TRIGGER_SCHEMA
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the state trigger."""
super().__init__(hass, config)
self._to_states = set(self._options[CONF_HVAC_MODE])
TRIGGERS: dict[str, type[Trigger]] = {
"hvac_mode_changed": HVACModeChangedTrigger,
"started_cooling": make_entity_target_state_attribute_trigger(
DOMAIN, ATTR_HVAC_ACTION, HVACAction.COOLING
),

View File

@@ -1,9 +1,9 @@
.trigger_common: &trigger_common
target:
target: &trigger_climate_target
entity:
domain: climate
fields:
behavior:
behavior: &trigger_behavior
required: true
default: any
selector:
@@ -19,3 +19,18 @@ started_drying: *trigger_common
started_heating: *trigger_common
turned_off: *trigger_common
turned_on: *trigger_common
hvac_mode_changed:
target: *trigger_climate_target
fields:
behavior: *trigger_behavior
hvac_mode:
context:
filter_target: target
required: true
selector:
state:
# Note: This should allow selecting multiple modes, but state selector does not support that yet.
hide_states:
- unavailable
- unknown

View File

@@ -8,6 +8,7 @@
{ "registered_devices": true }
],
"documentation": "https://www.home-assistant.io/integrations/incomfort",
"integration_type": "hub",
"iot_class": "local_polling",
"loggers": ["incomfortclient"],
"quality_scale": "platinum",

View File

@@ -39,19 +39,6 @@ class LaMarzoccoSwitchEntityDescription(
ENTITIES: tuple[LaMarzoccoSwitchEntityDescription, ...] = (
LaMarzoccoSwitchEntityDescription(
key="main",
translation_key="main",
name=None,
control_fn=lambda machine, state: machine.set_power(state),
is_on_fn=(
lambda machine: cast(
MachineStatus, machine.dashboard.config[WidgetType.CM_MACHINE_STATUS]
).mode
is MachineMode.BREWING_MODE
),
bt_offline_mode=True,
),
LaMarzoccoSwitchEntityDescription(
key="steam_boiler_enable",
translation_key="steam_boiler",
@@ -98,6 +85,20 @@ ENTITIES: tuple[LaMarzoccoSwitchEntityDescription, ...] = (
),
)
MAIN_SWITCH_ENTITY = LaMarzoccoSwitchEntityDescription(
key="main",
translation_key="main",
name=None,
control_fn=lambda machine, state: machine.set_power(state),
is_on_fn=(
lambda machine: cast(
MachineStatus, machine.dashboard.config[WidgetType.CM_MACHINE_STATUS]
).mode
is MachineMode.BREWING_MODE
),
bt_offline_mode=True,
)
async def async_setup_entry(
hass: HomeAssistant,
@@ -107,12 +108,11 @@ async def async_setup_entry(
"""Set up switch entities and services."""
coordinator = entry.runtime_data.config_coordinator
bluetooth_coordinator = entry.runtime_data.bluetooth_coordinator
entities: list[SwitchEntity] = []
entities.extend(
LaMarzoccoSwitchEntity(
coordinator, description, entry.runtime_data.bluetooth_coordinator
)
LaMarzoccoSwitchEntity(coordinator, description, bluetooth_coordinator)
for description in ENTITIES
if description.supported_fn(coordinator)
)
@@ -122,6 +122,12 @@ async def async_setup_entry(
for wake_up_sleep_entry in coordinator.device.schedule.smart_wake_up_sleep.schedules
)
entities.append(
LaMarzoccoMainSwitchEntity(
coordinator, MAIN_SWITCH_ENTITY, bluetooth_coordinator
)
)
async_add_entities(entities)
@@ -160,6 +166,17 @@ class LaMarzoccoSwitchEntity(LaMarzoccoEntity, SwitchEntity):
return self.entity_description.is_on_fn(self.coordinator.device)
class LaMarzoccoMainSwitchEntity(LaMarzoccoSwitchEntity):
"""Switch representing espresso machine main power."""
@property
def entity_picture(self) -> str | None:
"""Return the entity picture."""
image_url = self.coordinator.device.dashboard.image_url
return image_url if image_url else None # image URL can be empty string
class LaMarzoccoAutoOnOffSwitchEntity(LaMarzoccoBaseEntity, SwitchEntity):
"""Switch representing espresso machine auto on/off."""

View File

@@ -42,6 +42,9 @@
"number": {
"cook_time": {
"default": "mdi:microwave"
},
"speaker_setpoint": {
"default": "mdi:speaker"
}
},
"select": {

View File

@@ -365,6 +365,31 @@ DISCOVERY_SCHEMAS = [
clusters.MicrowaveOvenControl.Attributes.MaxCookTime,
),
),
MatterDiscoverySchema(
platform=Platform.NUMBER,
entity_description=MatterRangeNumberEntityDescription(
key="speaker_setpoint",
translation_key="speaker_setpoint",
native_unit_of_measurement=PERCENTAGE,
command=lambda value: clusters.LevelControl.Commands.MoveToLevel(
level=int(value)
),
native_min_value=0,
native_max_value=100,
native_step=1,
device_to_ha=lambda x: None if x is None else x,
min_attribute=clusters.LevelControl.Attributes.MinLevel,
max_attribute=clusters.LevelControl.Attributes.MaxLevel,
mode=NumberMode.SLIDER,
),
entity_class=MatterRangeNumber,
required_attributes=(
clusters.LevelControl.Attributes.CurrentLevel,
clusters.LevelControl.Attributes.MinLevel,
clusters.LevelControl.Attributes.MaxLevel,
),
device_type=(device_types.Speaker,),
),
MatterDiscoverySchema(
platform=Platform.NUMBER,
entity_description=MatterNumberEntityDescription(

View File

@@ -232,6 +232,9 @@
"pump_setpoint": {
"name": "Setpoint"
},
"speaker_setpoint": {
"name": "Volume"
},
"temperature_offset": {
"name": "Temperature offset"
},

View File

@@ -1,36 +1,20 @@
"""Support for Tibber."""
from __future__ import annotations
from dataclasses import dataclass, field
import logging
import aiohttp
from aiohttp.client_exceptions import ClientError, ClientResponseError
import tibber
from tibber import data_api as tibber_data_api
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_ACCESS_TOKEN, EVENT_HOMEASSISTANT_STOP, Platform
from homeassistant.core import Event, HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.config_entry_oauth2_flow import (
ImplementationUnavailableError,
OAuth2Session,
async_get_config_entry_implementation,
)
from homeassistant.helpers.typing import ConfigType
from homeassistant.util import dt as dt_util, ssl as ssl_util
from .const import (
AUTH_IMPLEMENTATION,
CONF_LEGACY_ACCESS_TOKEN,
DATA_HASS_CONFIG,
DOMAIN,
TibberConfigEntry,
)
from .coordinator import TibberDataAPICoordinator
from .const import DATA_HASS_CONFIG, DOMAIN
from .services import async_setup_services
PLATFORMS = [Platform.NOTIFY, Platform.SENSOR]
@@ -40,33 +24,6 @@ CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
_LOGGER = logging.getLogger(__name__)
@dataclass
class TibberRuntimeData:
"""Runtime data for Tibber API entries."""
tibber_connection: tibber.Tibber
session: OAuth2Session
data_api_coordinator: TibberDataAPICoordinator | None = field(default=None)
_client: tibber_data_api.TibberDataAPI | None = None
async def async_get_client(
self, hass: HomeAssistant
) -> tibber_data_api.TibberDataAPI:
"""Return an authenticated Tibber Data API client."""
await self.session.async_ensure_token_valid()
token = self.session.token
access_token = token.get(CONF_ACCESS_TOKEN)
if not access_token:
raise ConfigEntryAuthFailed("Access token missing from OAuth session")
if self._client is None:
self._client = tibber_data_api.TibberDataAPI(
access_token,
websession=async_get_clientsession(hass),
)
self._client.set_access_token(access_token)
return self._client
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Tibber component."""
@@ -77,23 +34,16 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
return True
async def async_setup_entry(hass: HomeAssistant, entry: TibberConfigEntry) -> bool:
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up a config entry."""
# Added in 2026.1 to migrate existing users to OAuth2 (Tibber Data API).
# Can be removed after 2026.5
if AUTH_IMPLEMENTATION not in entry.data:
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN,
translation_key="data_api_reauth_required",
)
tibber_connection = tibber.Tibber(
access_token=entry.data[CONF_LEGACY_ACCESS_TOKEN],
access_token=entry.data[CONF_ACCESS_TOKEN],
websession=async_get_clientsession(hass),
time_zone=dt_util.get_default_time_zone(),
ssl=ssl_util.get_default_context(),
)
hass.data[DOMAIN] = tibber_connection
async def _close(event: Event) -> None:
await tibber_connection.rt_disconnect()
@@ -102,6 +52,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: TibberConfigEntry) -> bo
try:
await tibber_connection.update_info()
except (
TimeoutError,
aiohttp.ClientError,
@@ -114,45 +65,17 @@ async def async_setup_entry(hass: HomeAssistant, entry: TibberConfigEntry) -> bo
except tibber.FatalHttpExceptionError:
return False
try:
implementation = await async_get_config_entry_implementation(hass, entry)
except ImplementationUnavailableError as err:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="oauth2_implementation_unavailable",
) from err
session = OAuth2Session(hass, entry, implementation)
try:
await session.async_ensure_token_valid()
except ClientResponseError as err:
if 400 <= err.status < 500:
raise ConfigEntryAuthFailed(
"OAuth session is not valid, reauthentication required"
) from err
raise ConfigEntryNotReady from err
except ClientError as err:
raise ConfigEntryNotReady from err
entry.runtime_data = TibberRuntimeData(
tibber_connection=tibber_connection,
session=session,
)
coordinator = TibberDataAPICoordinator(hass, entry)
await coordinator.async_config_entry_first_refresh()
entry.runtime_data.data_api_coordinator = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(
hass: HomeAssistant, config_entry: TibberConfigEntry
) -> bool:
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Unload a config entry."""
if unload_ok := await hass.config_entries.async_unload_platforms(
unload_ok = await hass.config_entries.async_unload_platforms(
config_entry, PLATFORMS
):
await config_entry.runtime_data.tibber_connection.rt_disconnect()
)
if unload_ok:
tibber_connection = hass.data[DOMAIN]
await tibber_connection.rt_disconnect()
return unload_ok

View File

@@ -1,15 +0,0 @@
"""Application credentials platform for Tibber."""
from homeassistant.components.application_credentials import AuthorizationServer
from homeassistant.core import HomeAssistant
AUTHORIZE_URL = "https://thewall.tibber.com/connect/authorize"
TOKEN_URL = "https://thewall.tibber.com/connect/token"
async def async_get_authorization_server(hass: HomeAssistant) -> AuthorizationServer:
"""Return authorization server for Tibber Data API."""
return AuthorizationServer(
authorize_url=AUTHORIZE_URL,
token_url=TOKEN_URL,
)

View File

@@ -2,164 +2,80 @@
from __future__ import annotations
from collections.abc import Mapping
import logging
from typing import Any
import aiohttp
import tibber
from tibber import data_api as tibber_data_api
import voluptuous as vol
from homeassistant.config_entries import SOURCE_REAUTH, SOURCE_USER, ConfigFlowResult
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_TOKEN
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.config_entry_oauth2_flow import AbstractOAuth2FlowHandler
from .const import CONF_LEGACY_ACCESS_TOKEN, DATA_API_DEFAULT_SCOPES, DOMAIN
from .const import DOMAIN
DATA_SCHEMA = vol.Schema({vol.Required(CONF_LEGACY_ACCESS_TOKEN): str})
DATA_SCHEMA = vol.Schema({vol.Required(CONF_ACCESS_TOKEN): str})
ERR_TIMEOUT = "timeout"
ERR_CLIENT = "cannot_connect"
ERR_TOKEN = "invalid_access_token"
TOKEN_URL = "https://developer.tibber.com/settings/access-token"
_LOGGER = logging.getLogger(__name__)
class TibberConfigFlow(AbstractOAuth2FlowHandler, domain=DOMAIN):
class TibberConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Tibber integration."""
VERSION = 1
DOMAIN = DOMAIN
def __init__(self) -> None:
"""Initialize the config flow."""
super().__init__()
self._access_token: str | None = None
self._title = ""
@property
def logger(self) -> logging.Logger:
"""Return the logger."""
return _LOGGER
@property
def extra_authorize_data(self) -> dict[str, Any]:
"""Extra data appended to the authorize URL."""
return {
**super().extra_authorize_data,
"scope": " ".join(DATA_API_DEFAULT_SCOPES),
}
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
if user_input is None:
data_schema = self.add_suggested_values_to_schema(
DATA_SCHEMA, {CONF_LEGACY_ACCESS_TOKEN: self._access_token or ""}
self._async_abort_entries_match()
if user_input is not None:
access_token = user_input[CONF_ACCESS_TOKEN].replace(" ", "")
tibber_connection = tibber.Tibber(
access_token=access_token,
websession=async_get_clientsession(self.hass),
)
return self.async_show_form(
step_id=SOURCE_USER,
data_schema=data_schema,
description_placeholders={"url": TOKEN_URL},
errors={},
)
errors = {}
self._access_token = user_input[CONF_LEGACY_ACCESS_TOKEN].replace(" ", "")
tibber_connection = tibber.Tibber(
access_token=self._access_token,
websession=async_get_clientsession(self.hass),
)
self._title = tibber_connection.name or "Tibber"
try:
await tibber_connection.update_info()
except TimeoutError:
errors[CONF_ACCESS_TOKEN] = ERR_TIMEOUT
except tibber.InvalidLoginError:
errors[CONF_ACCESS_TOKEN] = ERR_TOKEN
except (
aiohttp.ClientError,
tibber.RetryableHttpExceptionError,
tibber.FatalHttpExceptionError,
):
errors[CONF_ACCESS_TOKEN] = ERR_CLIENT
errors: dict[str, str] = {}
try:
await tibber_connection.update_info()
except TimeoutError:
errors[CONF_LEGACY_ACCESS_TOKEN] = ERR_TIMEOUT
except tibber.InvalidLoginError:
errors[CONF_LEGACY_ACCESS_TOKEN] = ERR_TOKEN
except (
aiohttp.ClientError,
tibber.RetryableHttpExceptionError,
tibber.FatalHttpExceptionError,
):
errors[CONF_LEGACY_ACCESS_TOKEN] = ERR_CLIENT
if errors:
return self.async_show_form(
step_id="user",
data_schema=DATA_SCHEMA,
description_placeholders={"url": TOKEN_URL},
errors=errors,
)
if errors:
data_schema = self.add_suggested_values_to_schema(
DATA_SCHEMA, {CONF_LEGACY_ACCESS_TOKEN: self._access_token or ""}
)
return self.async_show_form(
step_id=SOURCE_USER,
data_schema=data_schema,
description_placeholders={"url": TOKEN_URL},
errors=errors,
)
await self.async_set_unique_id(tibber_connection.user_id)
if self.source == SOURCE_REAUTH:
reauth_entry = self._get_reauth_entry()
self._abort_if_unique_id_mismatch(
reason="wrong_account",
description_placeholders={"title": reauth_entry.title},
)
else:
unique_id = tibber_connection.user_id
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured()
return await self.async_step_pick_implementation()
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
) -> ConfigFlowResult:
"""Handle a reauth flow."""
reauth_entry = self._get_reauth_entry()
self._access_token = reauth_entry.data.get(CONF_LEGACY_ACCESS_TOKEN)
self._title = reauth_entry.title
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm reauthentication by reusing the user step."""
reauth_entry = self._get_reauth_entry()
self._access_token = reauth_entry.data.get(CONF_LEGACY_ACCESS_TOKEN)
self._title = reauth_entry.title
if user_input is None:
return self.async_show_form(
step_id="reauth_confirm",
return self.async_create_entry(
title=tibber_connection.name,
data={CONF_ACCESS_TOKEN: access_token},
)
return await self.async_step_user()
async def async_oauth_create_entry(self, data: dict) -> ConfigFlowResult:
"""Finalize the OAuth flow and create the config entry."""
if self._access_token is None:
return self.async_abort(reason="missing_configuration")
data[CONF_LEGACY_ACCESS_TOKEN] = self._access_token
access_token = data[CONF_TOKEN][CONF_ACCESS_TOKEN]
data_api_client = tibber_data_api.TibberDataAPI(
access_token,
websession=async_get_clientsession(self.hass),
return self.async_show_form(
step_id="user",
data_schema=DATA_SCHEMA,
description_placeholders={"url": TOKEN_URL},
errors={},
)
try:
await data_api_client.get_userinfo()
except (aiohttp.ClientError, TimeoutError):
return self.async_abort(reason="cannot_connect")
if self.source == SOURCE_REAUTH:
reauth_entry = self._get_reauth_entry()
return self.async_update_reload_and_abort(
reauth_entry,
data=data,
title=self._title,
)
return self.async_create_entry(title=self._title, data=data)

View File

@@ -1,34 +1,5 @@
"""Constants for Tibber integration."""
from __future__ import annotations
from typing import TYPE_CHECKING
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_ACCESS_TOKEN
if TYPE_CHECKING:
from . import TibberRuntimeData
type TibberConfigEntry = ConfigEntry[TibberRuntimeData]
CONF_LEGACY_ACCESS_TOKEN = CONF_ACCESS_TOKEN
AUTH_IMPLEMENTATION = "auth_implementation"
DATA_HASS_CONFIG = "tibber_hass_config"
DOMAIN = "tibber"
MANUFACTURER = "Tibber"
DATA_API_DEFAULT_SCOPES = [
"openid",
"profile",
"email",
"offline_access",
"data-api-user-read",
"data-api-chargers-read",
"data-api-energy-systems-read",
"data-api-homes-read",
"data-api-thermostats-read",
"data-api-vehicles-read",
"data-api-inverters-read",
]

View File

@@ -4,11 +4,9 @@ from __future__ import annotations
from datetime import timedelta
import logging
from typing import TYPE_CHECKING, cast
from typing import cast
from aiohttp.client_exceptions import ClientError
import tibber
from tibber.data_api import TibberDataAPI, TibberDevice
from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.models import (
@@ -21,18 +19,15 @@ from homeassistant.components.recorder.statistics import (
get_last_statistics,
statistics_during_period,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import UnitOfEnergy
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import dt as dt_util
from homeassistant.util.unit_conversion import EnergyConverter
from .const import DOMAIN
if TYPE_CHECKING:
from .const import TibberConfigEntry
FIVE_YEARS = 5 * 365 * 24
_LOGGER = logging.getLogger(__name__)
@@ -41,12 +36,12 @@ _LOGGER = logging.getLogger(__name__)
class TibberDataCoordinator(DataUpdateCoordinator[None]):
"""Handle Tibber data and insert statistics."""
config_entry: TibberConfigEntry
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: TibberConfigEntry,
config_entry: ConfigEntry,
tibber_connection: tibber.Tibber,
) -> None:
"""Initialize the data handler."""
@@ -192,64 +187,3 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
unit_of_measurement=unit,
)
async_add_external_statistics(self.hass, metadata, statistics)
class TibberDataAPICoordinator(DataUpdateCoordinator[dict[str, TibberDevice]]):
"""Fetch and cache Tibber Data API device capabilities."""
config_entry: TibberConfigEntry
def __init__(
self,
hass: HomeAssistant,
entry: TibberConfigEntry,
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
name=f"{DOMAIN} Data API",
update_interval=timedelta(minutes=1),
config_entry=entry,
)
self._runtime_data = entry.runtime_data
self.sensors_by_device: dict[str, dict[str, tibber.data_api.Sensor]] = {}
def _build_sensor_lookup(self, devices: dict[str, TibberDevice]) -> None:
"""Build sensor lookup dict for efficient access."""
self.sensors_by_device = {
device_id: {sensor.id: sensor for sensor in device.sensors}
for device_id, device in devices.items()
}
def get_sensor(
self, device_id: str, sensor_id: str
) -> tibber.data_api.Sensor | None:
"""Get a sensor by device and sensor ID."""
if device_sensors := self.sensors_by_device.get(device_id):
return device_sensors.get(sensor_id)
return None
async def _async_get_client(self) -> TibberDataAPI:
"""Get the Tibber Data API client with error handling."""
try:
return await self._runtime_data.async_get_client(self.hass)
except ConfigEntryAuthFailed:
raise
except (ClientError, TimeoutError, tibber.UserAgentMissingError) as err:
raise UpdateFailed(
f"Unable to create Tibber Data API client: {err}"
) from err
async def _async_setup(self) -> None:
"""Initial load of Tibber Data API devices."""
client = await self._async_get_client()
devices = await client.get_all_devices()
self._build_sensor_lookup(devices)
async def _async_update_data(self) -> dict[str, TibberDevice]:
"""Fetch the latest device capabilities from the Tibber Data API."""
client = await self._async_get_client()
devices: dict[str, TibberDevice] = await client.update_devices()
self._build_sensor_lookup(devices)
return devices

View File

@@ -4,22 +4,21 @@ from __future__ import annotations
from typing import Any
import aiohttp
import tibber
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from .const import TibberConfigEntry
from .const import DOMAIN
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, config_entry: TibberConfigEntry
hass: HomeAssistant, config_entry: ConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
tibber_connection: tibber.Tibber = hass.data[DOMAIN]
runtime = config_entry.runtime_data
result: dict[str, Any] = {
return {
"homes": [
{
"last_data_timestamp": home.last_data_timestamp,
@@ -28,38 +27,6 @@ async def async_get_config_entry_diagnostics(
"last_cons_data_timestamp": home.last_cons_data_timestamp,
"country": home.country,
}
for home in runtime.tibber_connection.get_homes(only_active=False)
for home in tibber_connection.get_homes(only_active=False)
]
}
devices: dict[str, Any] = {}
error: str | None = None
try:
coordinator = runtime.data_api_coordinator
if coordinator is not None:
devices = coordinator.data
except ConfigEntryAuthFailed:
error = "Authentication failed"
except TimeoutError:
error = "Timeout error"
except aiohttp.ClientError:
error = "Client error"
except tibber.InvalidLoginError:
error = "Invalid login"
except tibber.RetryableHttpExceptionError as err:
error = f"Retryable HTTP error ({err.status})"
except tibber.FatalHttpExceptionError as err:
error = f"Fatal HTTP error ({err.status})"
result["error"] = error
result["devices"] = [
{
"id": device.id,
"name": device.name,
"brand": device.brand,
"model": device.model,
}
for device in devices.values()
]
return result

View File

@@ -3,9 +3,9 @@
"name": "Tibber",
"codeowners": ["@danielhiversen"],
"config_flow": true,
"dependencies": ["application_credentials", "recorder"],
"dependencies": ["recorder"],
"documentation": "https://www.home-assistant.io/integrations/tibber",
"iot_class": "cloud_polling",
"loggers": ["tibber"],
"requirements": ["pyTibber==0.33.1"]
"requirements": ["pyTibber==0.32.2"]
}

View File

@@ -2,25 +2,28 @@
from __future__ import annotations
from tibber import Tibber
from homeassistant.components.notify import (
ATTR_TITLE_DEFAULT,
NotifyEntity,
NotifyEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import DOMAIN, TibberConfigEntry
from . import DOMAIN
async def async_setup_entry(
hass: HomeAssistant,
entry: TibberConfigEntry,
entry: ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Tibber notification entity."""
async_add_entities([TibberNotificationEntity(entry)])
async_add_entities([TibberNotificationEntity(entry.entry_id)])
class TibberNotificationEntity(NotifyEntity):
@@ -30,14 +33,13 @@ class TibberNotificationEntity(NotifyEntity):
_attr_name = DOMAIN
_attr_icon = "mdi:message-flash"
def __init__(self, entry: TibberConfigEntry) -> None:
def __init__(self, unique_id: str) -> None:
"""Initialize Tibber notify entity."""
self._attr_unique_id = entry.entry_id
self._entry = entry
self._attr_unique_id = unique_id
async def async_send_message(self, message: str, title: str | None = None) -> None:
"""Send a message to Tibber devices."""
tibber_connection = self._entry.runtime_data.tibber_connection
tibber_connection: Tibber = self.hass.data[DOMAIN]
try:
await tibber_connection.send_notification(
title or ATTR_TITLE_DEFAULT, message

View File

@@ -10,8 +10,7 @@ from random import randrange
from typing import Any
import aiohttp
from tibber import FatalHttpExceptionError, RetryableHttpExceptionError, TibberHome
from tibber.data_api import TibberDevice
import tibber
from homeassistant.components.sensor import (
SensorDeviceClass,
@@ -28,7 +27,6 @@ from homeassistant.const import (
UnitOfElectricCurrent,
UnitOfElectricPotential,
UnitOfEnergy,
UnitOfLength,
UnitOfPower,
)
from homeassistant.core import Event, HomeAssistant, callback
@@ -43,8 +41,8 @@ from homeassistant.helpers.update_coordinator import (
)
from homeassistant.util import Throttle, dt as dt_util
from .const import DOMAIN, MANUFACTURER, TibberConfigEntry
from .coordinator import TibberDataAPICoordinator, TibberDataCoordinator
from .const import DOMAIN, MANUFACTURER
from .coordinator import TibberDataCoordinator
_LOGGER = logging.getLogger(__name__)
@@ -262,65 +260,14 @@ SENSORS: tuple[SensorEntityDescription, ...] = (
)
DATA_API_SENSORS: tuple[SensorEntityDescription, ...] = (
SensorEntityDescription(
key="storage.stateOfCharge",
translation_key="storage_state_of_charge",
device_class=SensorDeviceClass.BATTERY,
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="storage.targetStateOfCharge",
translation_key="storage_target_state_of_charge",
device_class=SensorDeviceClass.BATTERY,
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="range.remaining",
translation_key="range_remaining",
device_class=SensorDeviceClass.DISTANCE,
native_unit_of_measurement=UnitOfLength.KILOMETERS,
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=1,
),
SensorEntityDescription(
key="charging.current.max",
translation_key="charging_current_max",
device_class=SensorDeviceClass.CURRENT,
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="charging.current.offlineFallback",
translation_key="charging_current_offline_fallback",
device_class=SensorDeviceClass.CURRENT,
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
state_class=SensorStateClass.MEASUREMENT,
),
)
async def async_setup_entry(
hass: HomeAssistant,
entry: TibberConfigEntry,
entry: ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Tibber sensor."""
_setup_data_api_sensors(entry, async_add_entities)
await _async_setup_graphql_sensors(hass, entry, async_add_entities)
async def _async_setup_graphql_sensors(
hass: HomeAssistant,
entry: TibberConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Tibber sensor."""
tibber_connection = entry.runtime_data.tibber_connection
tibber_connection = hass.data[DOMAIN]
entity_registry = er.async_get(hass)
device_registry = dr.async_get(hass)
@@ -333,11 +280,7 @@ async def _async_setup_graphql_sensors(
except TimeoutError as err:
_LOGGER.error("Timeout connecting to Tibber home: %s ", err)
raise PlatformNotReady from err
except (
RetryableHttpExceptionError,
FatalHttpExceptionError,
aiohttp.ClientError,
) as err:
except (tibber.RetryableHttpExceptionError, aiohttp.ClientError) as err:
_LOGGER.error("Error connecting to Tibber home: %s ", err)
raise PlatformNotReady from err
@@ -382,72 +325,7 @@ async def _async_setup_graphql_sensors(
device_entry.id, new_identifiers={(DOMAIN, home.home_id)}
)
async_add_entities(entities)
def _setup_data_api_sensors(
entry: TibberConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up sensors backed by the Tibber Data API."""
coordinator = entry.runtime_data.data_api_coordinator
if coordinator is None:
return
entities: list[TibberDataAPISensor] = []
api_sensors = {sensor.key: sensor for sensor in DATA_API_SENSORS}
for device in coordinator.data.values():
for sensor in device.sensors:
description: SensorEntityDescription | None = api_sensors.get(sensor.id)
if description is None:
_LOGGER.debug(
"Sensor %s not found in DATA_API_SENSORS, skipping", sensor
)
continue
entities.append(
TibberDataAPISensor(
coordinator, device, description, sensor.description
)
)
async_add_entities(entities)
class TibberDataAPISensor(CoordinatorEntity[TibberDataAPICoordinator], SensorEntity):
"""Representation of a Tibber Data API capability sensor."""
_attr_has_entity_name = True
def __init__(
self,
coordinator: TibberDataAPICoordinator,
device: TibberDevice,
entity_description: SensorEntityDescription,
name: str,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator)
self._device_id: str = device.id
self.entity_description = entity_description
self._attr_name = name
self._attr_unique_id = f"{device.external_id}_{self.entity_description.key}"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, device.external_id)},
name=device.name,
manufacturer=device.brand,
model=device.model,
)
@property
def native_value(self) -> StateType:
"""Return the value reported by the device."""
sensors = self.coordinator.sensors_by_device.get(self._device_id, {})
sensor = sensors.get(self.entity_description.key)
return sensor.value if sensor else None
async_add_entities(entities, True)
class TibberSensor(SensorEntity):
@@ -455,7 +333,9 @@ class TibberSensor(SensorEntity):
_attr_has_entity_name = True
def __init__(self, *args: Any, tibber_home: TibberHome, **kwargs: Any) -> None:
def __init__(
self, *args: Any, tibber_home: tibber.TibberHome, **kwargs: Any
) -> None:
"""Initialize the sensor."""
super().__init__(*args, **kwargs)
self._tibber_home = tibber_home
@@ -486,7 +366,7 @@ class TibberSensorElPrice(TibberSensor):
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_translation_key = "electricity_price"
def __init__(self, tibber_home: TibberHome) -> None:
def __init__(self, tibber_home: tibber.TibberHome) -> None:
"""Initialize the sensor."""
super().__init__(tibber_home=tibber_home)
self._last_updated: datetime.datetime | None = None
@@ -563,7 +443,7 @@ class TibberDataSensor(TibberSensor, CoordinatorEntity[TibberDataCoordinator]):
def __init__(
self,
tibber_home: TibberHome,
tibber_home: tibber.TibberHome,
coordinator: TibberDataCoordinator,
entity_description: SensorEntityDescription,
) -> None:
@@ -590,7 +470,7 @@ class TibberSensorRT(TibberSensor, CoordinatorEntity["TibberRtDataCoordinator"])
def __init__(
self,
tibber_home: TibberHome,
tibber_home: tibber.TibberHome,
description: SensorEntityDescription,
initial_state: float,
coordinator: TibberRtDataCoordinator,
@@ -652,7 +532,7 @@ class TibberRtEntityCreator:
def __init__(
self,
async_add_entities: AddConfigEntryEntitiesCallback,
tibber_home: TibberHome,
tibber_home: tibber.TibberHome,
entity_registry: er.EntityRegistry,
) -> None:
"""Initialize the data handler."""
@@ -738,7 +618,7 @@ class TibberRtDataCoordinator(DataUpdateCoordinator): # pylint: disable=hass-en
hass: HomeAssistant,
config_entry: ConfigEntry,
add_sensor_callback: Callable[[TibberRtDataCoordinator, Any], None],
tibber_home: TibberHome,
tibber_home: tibber.TibberHome,
) -> None:
"""Initialize the data handler."""
self._add_sensor_callback = add_sensor_callback

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
import datetime as dt
from datetime import datetime
from typing import TYPE_CHECKING, Any, Final
from typing import Any, Final
import voluptuous as vol
@@ -20,9 +20,6 @@ from homeassistant.util import dt as dt_util
from .const import DOMAIN
if TYPE_CHECKING:
from .const import TibberConfigEntry
PRICE_SERVICE_NAME = "get_prices"
ATTR_START: Final = "start"
ATTR_END: Final = "end"
@@ -36,13 +33,7 @@ SERVICE_SCHEMA: Final = vol.Schema(
async def __get_prices(call: ServiceCall) -> ServiceResponse:
entries: list[TibberConfigEntry] = call.hass.config_entries.async_entries(DOMAIN)
if not entries:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="no_config_entry",
)
tibber_connection = entries[0].runtime_data.tibber_connection
tibber_connection = call.hass.data[DOMAIN]
start = __get_date(call.data.get(ATTR_START), "start")
end = __get_date(call.data.get(ATTR_END), "end")
@@ -66,7 +57,7 @@ async def __get_prices(call: ServiceCall) -> ServiceResponse:
selected_data = [
price
for price in price_data
if start <= dt.datetime.fromisoformat(str(price["start_time"])) < end
if start <= dt.datetime.fromisoformat(price["start_time"]) < end
]
tibber_prices[home_nickname] = selected_data

View File

@@ -1,11 +1,7 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]",
"missing_configuration": "[%key:common::config_flow::abort::oauth2_missing_configuration%]",
"missing_credentials": "[%key:common::config_flow::abort::oauth2_missing_credentials%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]",
"wrong_account": "The connected account does not match {title}. Sign in with the same Tibber account and try again."
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
@@ -13,10 +9,6 @@
"timeout": "[%key:common::config_flow::error::timeout_connect%]"
},
"step": {
"reauth_confirm": {
"description": "Reconnect your Tibber account to refresh access.",
"title": "[%key:common::config_flow::title::reauth%]"
},
"user": {
"data": {
"access_token": "[%key:common::config_flow::data::access_token%]"
@@ -48,12 +40,6 @@
"average_power": {
"name": "Average power"
},
"charging_current_max": {
"name": "Maximum charging current"
},
"charging_current_offline_fallback": {
"name": "Offline fallback charging current"
},
"current_l1": {
"name": "Current L1"
},
@@ -102,18 +88,9 @@
"power_production": {
"name": "Power production"
},
"range_remaining": {
"name": "Remaining range"
},
"signal_strength": {
"name": "Signal strength"
},
"storage_state_of_charge": {
"name": "Storage state of charge"
},
"storage_target_state_of_charge": {
"name": "Storage target state of charge"
},
"voltage_phase1": {
"name": "Voltage phase1"
},
@@ -126,18 +103,9 @@
}
},
"exceptions": {
"data_api_reauth_required": {
"message": "Reconnect Tibber so Home Assistant can enable the new Tibber Data API features."
},
"invalid_date": {
"message": "Invalid datetime provided {date}"
},
"no_config_entry": {
"message": "No Tibber integration configured"
},
"oauth2_implementation_unavailable": {
"message": "[%key:common::exceptions::oauth2_implementation_unavailable::message%]"
},
"send_message_timeout": {
"message": "Timeout sending message with Tibber"
}

View File

@@ -22,7 +22,7 @@
"zha",
"universal_silabs_flasher"
],
"requirements": ["zha==0.0.80"],
"requirements": ["zha==0.0.81"],
"usb": [
{
"description": "*2652*",

View File

@@ -38,7 +38,6 @@ APPLICATION_CREDENTIALS = [
"smartthings",
"spotify",
"tesla_fleet",
"tibber",
"twitch",
"volvo",
"watts",

View File

@@ -433,11 +433,21 @@ class EntityTriggerBase(Trigger):
class EntityTargetStateTriggerBase(EntityTriggerBase):
"""Trigger for entity state changes to a specific state."""
_to_state: str
_to_states: set[str]
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check if the origin state is valid and the state has changed."""
if from_state.state in (STATE_UNAVAILABLE, STATE_UNKNOWN):
return False
return (
from_state.state != to_state.state
and from_state.state not in self._to_states
)
def is_valid_state(self, state: State) -> bool:
"""Check if the new state matches the expected state."""
return state.state == self._to_state
return state.state in self._to_states
class EntityTransitionTriggerBase(EntityTriggerBase):
@@ -495,15 +505,20 @@ class EntityTargetStateAttributeTriggerBase(EntityTriggerBase):
def make_entity_target_state_trigger(
domain: str, to_state: str
domain: str, to_states: str | set[str]
) -> type[EntityTargetStateTriggerBase]:
"""Create a trigger for entity state changes to a specific state."""
"""Create a trigger for entity state changes to specific state(s)."""
if isinstance(to_states, str):
to_states_set = {to_states}
else:
to_states_set = to_states
class CustomTrigger(EntityTargetStateTriggerBase):
"""Trigger for entity state changes."""
_domain = domain
_to_state = to_state
_to_states = to_states_set
return CustomTrigger

4
requirements_all.txt generated
View File

@@ -1864,7 +1864,7 @@ pyRFXtrx==0.31.1
pySDCP==1
# homeassistant.components.tibber
pyTibber==0.33.1
pyTibber==0.32.2
# homeassistant.components.dlink
pyW215==0.8.0
@@ -3264,7 +3264,7 @@ zeroconf==0.148.0
zeversolar==0.3.2
# homeassistant.components.zha
zha==0.0.80
zha==0.0.81
# homeassistant.components.zhong_hong
zhong-hong-hvac==1.0.13

View File

@@ -1592,7 +1592,7 @@ pyHomee==1.3.8
pyRFXtrx==0.31.1
# homeassistant.components.tibber
pyTibber==0.33.1
pyTibber==0.32.2
# homeassistant.components.dlink
pyW215==0.8.0
@@ -2725,7 +2725,7 @@ zeroconf==0.148.0
zeversolar==0.3.2
# homeassistant.components.zha
zha==0.0.80
zha==0.0.81
# homeassistant.components.zwave_js
zwave-js-server-python==0.67.1

View File

@@ -1,5 +1,6 @@
"""The tests for components."""
from collections.abc import Iterable
from enum import StrEnum
import itertools
from typing import TypedDict
@@ -345,6 +346,15 @@ def set_or_remove_state(
)
def other_states(state: StrEnum) -> list[str]:
def other_states(state: StrEnum | Iterable[StrEnum]) -> list[str]:
"""Return a sorted list with all states except the specified one."""
return sorted({s.value for s in state.__class__} - {state.value})
if isinstance(state, StrEnum):
excluded_values = {state.value}
enum_class = state.__class__
else:
if len(state) == 0:
raise ValueError("state iterable must not be empty")
excluded_values = {s.value for s in state}
enum_class = list(state)[0].__class__
return sorted({s.value for s in enum_class} - excluded_values)

View File

@@ -1,17 +1,22 @@
"""Test climate trigger."""
from collections.abc import Generator
from contextlib import AbstractContextManager, nullcontext as does_not_raise
from typing import Any
from unittest.mock import patch
import pytest
import voluptuous as vol
from homeassistant.components.climate.const import (
ATTR_HVAC_ACTION,
HVACAction,
HVACMode,
)
from homeassistant.const import ATTR_LABEL_ID, CONF_ENTITY_ID
from homeassistant.components.climate.trigger import CONF_HVAC_MODE
from homeassistant.const import ATTR_LABEL_ID, CONF_ENTITY_ID, CONF_OPTIONS, CONF_TARGET
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.helpers.trigger import async_validate_trigger_config
from tests.components import (
StateDescription,
@@ -48,6 +53,7 @@ async def target_climates(hass: HomeAssistant) -> list[str]:
@pytest.mark.parametrize(
"trigger_key",
[
"climate.hvac_mode_changed",
"climate.turned_off",
"climate.turned_on",
"climate.started_heating",
@@ -66,20 +72,105 @@ async def test_climate_triggers_gated_by_labs_flag(
) in caplog.text
@pytest.mark.usefixtures("enable_experimental_triggers_conditions")
@pytest.mark.parametrize(
("trigger", "trigger_options", "expected_result"),
[
# Test validating climate.hvac_mode_changed
# Valid configurations
(
"climate.hvac_mode_changed",
{CONF_HVAC_MODE: [HVACMode.HEAT, HVACMode.COOL]},
does_not_raise(),
),
(
"climate.hvac_mode_changed",
{CONF_HVAC_MODE: HVACMode.HEAT},
does_not_raise(),
),
# Invalid configurations
(
"climate.hvac_mode_changed",
# Empty hvac_mode list
{CONF_HVAC_MODE: []},
pytest.raises(vol.Invalid),
),
(
"climate.hvac_mode_changed",
# Missing CONF_HVAC_MODE
{},
pytest.raises(vol.Invalid),
),
(
"climate.hvac_mode_changed",
{CONF_HVAC_MODE: ["invalid_mode"]},
pytest.raises(vol.Invalid),
),
],
)
async def test_climate_trigger_validation(
hass: HomeAssistant,
trigger: str,
trigger_options: dict[str, Any],
expected_result: AbstractContextManager,
) -> None:
"""Test climate trigger config validation."""
with expected_result:
await async_validate_trigger_config(
hass,
[
{
"platform": trigger,
CONF_TARGET: {CONF_ENTITY_ID: "climate.test_climate"},
CONF_OPTIONS: trigger_options,
}
],
)
def parametrize_climate_trigger_states(
*,
trigger: str,
trigger_options: dict | None = None,
target_states: list[str | None | tuple[str | None, dict]],
other_states: list[str | None | tuple[str | None, dict]],
additional_attributes: dict | None = None,
trigger_from_none: bool = True,
) -> list[tuple[str, dict[str, Any], list[StateDescription]]]:
"""Parametrize states and expected service call counts."""
trigger_options = trigger_options or {}
return [
(s[0], trigger_options, *s[1:])
for s in parametrize_trigger_states(
trigger=trigger,
target_states=target_states,
other_states=other_states,
additional_attributes=additional_attributes,
trigger_from_none=trigger_from_none,
)
]
@pytest.mark.usefixtures("enable_experimental_triggers_conditions")
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target"),
parametrize_target_entities("climate"),
)
@pytest.mark.parametrize(
("trigger", "states"),
("trigger", "trigger_options", "states"),
[
*parametrize_trigger_states(
*parametrize_climate_trigger_states(
trigger="climate.hvac_mode_changed",
trigger_options={CONF_HVAC_MODE: [HVACMode.HEAT, HVACMode.COOL]},
target_states=[HVACMode.HEAT, HVACMode.COOL],
other_states=other_states([HVACMode.HEAT, HVACMode.COOL]),
),
*parametrize_climate_trigger_states(
trigger="climate.turned_off",
target_states=[HVACMode.OFF],
other_states=other_states(HVACMode.OFF),
),
*parametrize_trigger_states(
*parametrize_climate_trigger_states(
trigger="climate.turned_on",
target_states=[
HVACMode.AUTO,
@@ -103,6 +194,7 @@ async def test_climate_state_trigger_behavior_any(
entity_id: str,
entities_in_target: int,
trigger: str,
trigger_options: dict[str, Any],
states: list[StateDescription],
) -> None:
"""Test that the climate state trigger fires when any climate state changes to a specific state."""
@@ -113,7 +205,7 @@ async def test_climate_state_trigger_behavior_any(
set_or_remove_state(hass, eid, states[0]["included"])
await hass.async_block_till_done()
await arm_trigger(hass, trigger, {}, trigger_target_config)
await arm_trigger(hass, trigger, trigger_options, trigger_target_config)
for state in states[1:]:
included_state = state["included"]
@@ -200,14 +292,20 @@ async def test_climate_state_attribute_trigger_behavior_any(
parametrize_target_entities("climate"),
)
@pytest.mark.parametrize(
("trigger", "states"),
("trigger", "trigger_options", "states"),
[
*parametrize_trigger_states(
*parametrize_climate_trigger_states(
trigger="climate.hvac_mode_changed",
trigger_options={CONF_HVAC_MODE: [HVACMode.HEAT, HVACMode.COOL]},
target_states=[HVACMode.HEAT, HVACMode.COOL],
other_states=other_states([HVACMode.HEAT, HVACMode.COOL]),
),
*parametrize_climate_trigger_states(
trigger="climate.turned_off",
target_states=[HVACMode.OFF],
other_states=other_states(HVACMode.OFF),
),
*parametrize_trigger_states(
*parametrize_climate_trigger_states(
trigger="climate.turned_on",
target_states=[
HVACMode.AUTO,
@@ -231,6 +329,7 @@ async def test_climate_state_trigger_behavior_first(
entities_in_target: int,
entity_id: str,
trigger: str,
trigger_options: dict[str, Any],
states: list[StateDescription],
) -> None:
"""Test that the climate state trigger fires when the first climate changes to a specific state."""
@@ -241,7 +340,9 @@ async def test_climate_state_trigger_behavior_first(
set_or_remove_state(hass, eid, states[0]["included"])
await hass.async_block_till_done()
await arm_trigger(hass, trigger, {"behavior": "first"}, trigger_target_config)
await arm_trigger(
hass, trigger, {"behavior": "first"} | trigger_options, trigger_target_config
)
for state in states[1:]:
included_state = state["included"]
@@ -294,7 +395,7 @@ async def test_climate_state_attribute_trigger_behavior_first(
trigger: str,
states: list[tuple[tuple[str, dict], int]],
) -> None:
"""Test that the climate state trigger fires when any climate state changes to a specific state."""
"""Test that the climate state trigger fires when the first climate state changes to a specific state."""
other_entity_ids = set(target_climates) - {entity_id}
# Set all climates, including the tested climate, to the initial state
@@ -326,14 +427,20 @@ async def test_climate_state_attribute_trigger_behavior_first(
parametrize_target_entities("climate"),
)
@pytest.mark.parametrize(
("trigger", "states"),
("trigger", "trigger_options", "states"),
[
*parametrize_trigger_states(
*parametrize_climate_trigger_states(
trigger="climate.hvac_mode_changed",
trigger_options={CONF_HVAC_MODE: [HVACMode.HEAT, HVACMode.COOL]},
target_states=[HVACMode.HEAT, HVACMode.COOL],
other_states=other_states([HVACMode.HEAT, HVACMode.COOL]),
),
*parametrize_climate_trigger_states(
trigger="climate.turned_off",
target_states=[HVACMode.OFF],
other_states=other_states(HVACMode.OFF),
),
*parametrize_trigger_states(
*parametrize_climate_trigger_states(
trigger="climate.turned_on",
target_states=[
HVACMode.AUTO,
@@ -357,6 +464,7 @@ async def test_climate_state_trigger_behavior_last(
entities_in_target: int,
entity_id: str,
trigger: str,
trigger_options: dict[str, Any],
states: list[StateDescription],
) -> None:
"""Test that the climate state trigger fires when the last climate changes to a specific state."""
@@ -367,7 +475,9 @@ async def test_climate_state_trigger_behavior_last(
set_or_remove_state(hass, eid, states[0]["included"])
await hass.async_block_till_done()
await arm_trigger(hass, trigger, {"behavior": "last"}, trigger_target_config)
await arm_trigger(
hass, trigger, {"behavior": "last"} | trigger_options, trigger_target_config
)
for state in states[1:]:
included_state = state["included"]
@@ -419,7 +529,7 @@ async def test_climate_state_attribute_trigger_behavior_last(
trigger: str,
states: list[tuple[tuple[str, dict], int]],
) -> None:
"""Test that the climate state trigger fires when any climate state changes to a specific state."""
"""Test that the climate state trigger fires when the last climate state changes to a specific state."""
other_entity_ids = set(target_climates) - {entity_id}
# Set all climates, including the tested climate, to the initial state

View File

@@ -133,6 +133,7 @@
# name: test_switches[switch.gs012345-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'entity_picture': 'https://lion.lamarzocco.io/img/thing-model/detail/gs3av/gs3av-1.png',
'friendly_name': 'GS012345',
}),
'context': <ANY>,

View File

@@ -46,7 +46,7 @@
"0/40/65532": 0,
"0/40/0": 19,
"0/40/6": "**REDACTED**",
"0/40/1": "Beep Home",
"0/40/1": "TEST_VENDOR",
"0/40/2": 65521,
"0/40/3": "Mock speaker",
"0/40/4": 32768,
@@ -54,7 +54,7 @@
"0/40/8": "1.0",
"0/40/9": 1,
"0/40/10": "1.0",
"0/40/18": "A576929DE6D138DC",
"0/40/18": "123456789",
"0/40/19": {
"0": 3,
"1": 3
@@ -221,6 +221,8 @@
"1/8/65532": 0,
"1/8/65533": 6,
"1/8/0": 47,
"1/8/2": 0,
"1/8/3": 100,
"1/8/17": null,
"1/8/15": 0,
"1/8/65528": [],

View File

@@ -3691,6 +3691,64 @@
'state': '4.0',
})
# ---
# name: test_numbers[speaker][number.mock_speaker_volume-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': dict({
'max': 100,
'min': 0,
'mode': <NumberMode.SLIDER: 'slider'>,
'step': 1,
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'number',
'entity_category': None,
'entity_id': 'number.mock_speaker_volume',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': 'Volume',
'platform': 'matter',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'speaker_setpoint',
'unique_id': '00000000000004D2-000000000000006B-MatterNodeDevice-1-speaker_setpoint-8-0',
'unit_of_measurement': '%',
})
# ---
# name: test_numbers[speaker][number.mock_speaker_volume-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Mock speaker Volume',
'max': 100,
'min': 0,
'mode': <NumberMode.SLIDER: 'slider'>,
'step': 1,
'unit_of_measurement': '%',
}),
'context': <ANY>,
'entity_id': 'number.mock_speaker_volume',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '47',
})
# ---
# name: test_numbers[valve][number.valve_default_open_duration-entry]
EntityRegistryEntrySnapshot({
'aliases': set({

View File

@@ -4,68 +4,21 @@ from collections.abc import AsyncGenerator
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
import pytest
import tibber
from homeassistant.components.application_credentials import (
ClientCredential,
async_import_client_credential,
)
from homeassistant.components.recorder import Recorder
from homeassistant.components.tibber import TibberRuntimeData
from homeassistant.components.tibber.const import AUTH_IMPLEMENTATION, DOMAIN
from homeassistant.components.tibber.coordinator import TibberDataAPICoordinator
from homeassistant.components.tibber.const import DOMAIN
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from tests.common import MockConfigEntry
def create_tibber_device(
device_id: str = "device-id",
external_id: str = "external-id",
name: str = "Test Device",
brand: str = "Tibber",
model: str = "Gen1",
value: float | None = 72.0,
home_id: str = "home-id",
) -> tibber.data_api.TibberDevice:
"""Create a fake Tibber Data API device."""
device_data = {
"id": device_id,
"externalId": external_id,
"info": {
"name": name,
"brand": brand,
"model": model,
},
"capabilities": [
{
"id": "storage.stateOfCharge",
"value": value,
"description": "State of charge",
"unit": "%",
},
{
"id": "unknown.sensor.id",
"value": None,
"description": "Unknown",
"unit": "",
},
],
}
return tibber.data_api.TibberDevice(device_data, home_id=home_id)
@pytest.fixture
def config_entry(hass: HomeAssistant) -> MockConfigEntry:
"""Tibber config entry."""
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_ACCESS_TOKEN: "token",
AUTH_IMPLEMENTATION: DOMAIN,
},
data={CONF_ACCESS_TOKEN: "token"},
unique_id="tibber",
)
config_entry.add_to_hass(hass)
@@ -86,88 +39,8 @@ async def mock_tibber_setup(
tibber_mock.name = PropertyMock(return_value=title)
tibber_mock.send_notification = AsyncMock()
tibber_mock.rt_disconnect = AsyncMock()
tibber_mock.get_homes = MagicMock(return_value=[])
session_mock = MagicMock()
session_mock.async_ensure_token_valid = AsyncMock()
session_mock.token = {CONF_ACCESS_TOKEN: "test-token"}
implementation_mock = MagicMock()
data_api_client_mock = MagicMock()
data_api_client_mock.get_all_devices = AsyncMock(return_value={})
data_api_client_mock.update_devices = AsyncMock(return_value={})
with (
patch("tibber.Tibber", return_value=tibber_mock),
patch(
"homeassistant.components.tibber.async_get_config_entry_implementation",
return_value=implementation_mock,
),
patch(
"homeassistant.components.tibber.OAuth2Session",
return_value=session_mock,
),
patch(
"homeassistant.components.tibber.coordinator.TibberDataAPICoordinator._async_get_client",
return_value=data_api_client_mock,
),
):
with patch("tibber.Tibber", return_value=tibber_mock):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
yield tibber_mock
@pytest.fixture
async def setup_credentials(recorder_mock: Recorder, hass: HomeAssistant) -> None:
"""Set up application credentials for the OAuth flow."""
assert await async_setup_component(hass, "application_credentials", {})
await async_import_client_credential(
hass,
DOMAIN,
ClientCredential("test-client-id", "test-client-secret"),
DOMAIN,
)
@pytest.fixture
def data_api_entry(hass: HomeAssistant) -> MockConfigEntry:
"""Create a Data API Tibber config entry."""
entry = MockConfigEntry(
domain=DOMAIN,
data={CONF_ACCESS_TOKEN: "token"},
unique_id="data-api",
)
entry.add_to_hass(hass)
return entry
def create_mock_runtime(
async_get_client: AsyncMock | None = None,
tibber_connection: MagicMock | None = None,
coordinator_data: dict | None = None,
) -> TibberRuntimeData:
"""Create a mock TibberRuntimeData.
Args:
async_get_client: Optional async mock for getting the Data API client.
tibber_connection: Optional mock for the GraphQL connection.
coordinator_data: Optional data dict for the coordinator.
"""
session = MagicMock()
session.async_ensure_token_valid = AsyncMock()
session.token = {CONF_ACCESS_TOKEN: "test-token"}
coordinator = MagicMock(spec=TibberDataAPICoordinator)
coordinator.data = coordinator_data if coordinator_data is not None else {}
coordinator.sensors_by_device = {}
runtime = MagicMock(spec=TibberRuntimeData)
runtime.session = session
runtime.tibber_connection = tibber_connection or MagicMock()
runtime.tibber_connection.get_homes = MagicMock(return_value=[])
runtime.data_api_coordinator = coordinator
runtime.async_get_client = async_get_client or AsyncMock()
return runtime

View File

@@ -1,9 +1,7 @@
"""Tests for Tibber config flow."""
import builtins
from http import HTTPStatus
from unittest.mock import AsyncMock, MagicMock, patch
from urllib.parse import parse_qs, urlparse
from asyncio import TimeoutError
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
from aiohttp import ClientError
import pytest
@@ -15,22 +13,16 @@ from tibber import (
from homeassistant import config_entries
from homeassistant.components.recorder import Recorder
from homeassistant.components.tibber.application_credentials import TOKEN_URL
from homeassistant.components.tibber.config_flow import (
DATA_API_DEFAULT_SCOPES,
ERR_CLIENT,
ERR_TIMEOUT,
ERR_TOKEN,
)
from homeassistant.components.tibber.const import AUTH_IMPLEMENTATION, DOMAIN
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_TOKEN
from homeassistant.components.tibber.const import DOMAIN
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from tests.common import MockConfigEntry
from tests.test_util.aiohttp import AiohttpClientMocker
from tests.typing import ClientSessionGenerator
@pytest.fixture(name="tibber_setup", autouse=True)
def tibber_setup_fixture():
@@ -39,22 +31,6 @@ def tibber_setup_fixture():
yield
def _mock_tibber(
*,
user_id: str = "unique_user_id",
title: str = "Mock Name",
update_side_effect: Exception | None = None,
) -> MagicMock:
"""Return a mocked Tibber GraphQL client."""
tibber_mock = MagicMock()
tibber_mock.user_id = user_id
tibber_mock.name = title
tibber_mock.update_info = AsyncMock()
if update_side_effect is not None:
tibber_mock.update_info.side_effect = update_side_effect
return tibber_mock
async def test_show_config_form(recorder_mock: Recorder, hass: HomeAssistant) -> None:
"""Test show configuration form."""
result = await hass.config_entries.flow.async_init(
@@ -65,35 +41,62 @@ async def test_show_config_form(recorder_mock: Recorder, hass: HomeAssistant) ->
assert result["step_id"] == "user"
async def test_create_entry(recorder_mock: Recorder, hass: HomeAssistant) -> None:
"""Test create entry from user input."""
test_data = {
CONF_ACCESS_TOKEN: "valid",
}
unique_user_id = "unique_user_id"
title = "title"
tibber_mock = MagicMock()
type(tibber_mock).update_info = AsyncMock(return_value=True)
type(tibber_mock).user_id = PropertyMock(return_value=unique_user_id)
type(tibber_mock).name = PropertyMock(return_value=title)
with patch("tibber.Tibber", return_value=tibber_mock):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=test_data
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == title
assert result["data"] == test_data
@pytest.mark.parametrize(
("exception", "expected_error"),
[
(builtins.TimeoutError(), ERR_TIMEOUT),
(ClientError(), ERR_CLIENT),
(TimeoutError, ERR_TIMEOUT),
(ClientError, ERR_CLIENT),
(InvalidLoginError(401), ERR_TOKEN),
(RetryableHttpExceptionError(503), ERR_CLIENT),
(FatalHttpExceptionError(404), ERR_CLIENT),
],
)
async def test_graphql_step_exceptions(
recorder_mock: Recorder,
hass: HomeAssistant,
exception: Exception,
expected_error: str,
async def test_create_entry_exceptions(
recorder_mock: Recorder, hass: HomeAssistant, exception, expected_error
) -> None:
"""Validate GraphQL errors are surfaced."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
"""Test create entry from user input."""
test_data = {
CONF_ACCESS_TOKEN: "valid",
}
unique_user_id = "unique_user_id"
title = "title"
tibber_mock = MagicMock()
type(tibber_mock).update_info = AsyncMock(side_effect=exception)
type(tibber_mock).user_id = PropertyMock(return_value=unique_user_id)
type(tibber_mock).name = PropertyMock(return_value=title)
tibber_mock = _mock_tibber(update_side_effect=exception)
with patch("tibber.Tibber", return_value=tibber_mock):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_ACCESS_TOKEN: "invalid"}
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=test_data
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"][CONF_ACCESS_TOKEN] == expected_error
@@ -101,203 +104,13 @@ async def test_flow_entry_already_exists(
recorder_mock: Recorder, hass: HomeAssistant, config_entry
) -> None:
"""Test user input for config_entry that already exists."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
test_data = {
CONF_ACCESS_TOKEN: "valid",
}
tibber_mock = _mock_tibber(user_id="tibber")
with patch("tibber.Tibber", return_value=tibber_mock):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_ACCESS_TOKEN: "valid"}
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_reauth_flow_steps(
recorder_mock: Recorder,
hass: HomeAssistant,
config_entry: MockConfigEntry,
) -> None:
"""Test the reauth flow goes through reauth_confirm to user step."""
reauth_flow = await config_entry.start_reauth_flow(hass)
assert reauth_flow["type"] is FlowResultType.FORM
assert reauth_flow["step_id"] == "reauth_confirm"
result = await hass.config_entries.flow.async_configure(reauth_flow["flow_id"])
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "reauth_confirm"
result = await hass.config_entries.flow.async_configure(
reauth_flow["flow_id"],
user_input={},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
async def test_oauth_create_entry_missing_configuration(
recorder_mock: Recorder,
hass: HomeAssistant,
) -> None:
"""Abort OAuth finalize if GraphQL step did not run."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_USER},
)
handler = hass.config_entries.flow._progress[result["flow_id"]]
flow_result = await handler.async_oauth_create_entry(
{CONF_TOKEN: {CONF_ACCESS_TOKEN: "rest-token"}}
)
assert flow_result["type"] is FlowResultType.ABORT
assert flow_result["reason"] == "missing_configuration"
async def test_oauth_create_entry_cannot_connect_userinfo(
recorder_mock: Recorder,
hass: HomeAssistant,
) -> None:
"""Abort OAuth finalize when Data API userinfo cannot be retrieved."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_USER},
)
handler = hass.config_entries.flow._progress[result["flow_id"]]
handler._access_token = "graphql-token"
data_api_client = MagicMock()
data_api_client.get_userinfo = AsyncMock(side_effect=ClientError())
with patch(
"homeassistant.components.tibber.config_flow.tibber_data_api.TibberDataAPI",
return_value=data_api_client,
):
flow_result = await handler.async_oauth_create_entry(
{CONF_TOKEN: {CONF_ACCESS_TOKEN: "rest-token"}}
)
assert flow_result["type"] is FlowResultType.ABORT
assert flow_result["reason"] == "cannot_connect"
async def test_data_api_requires_credentials(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""Abort when OAuth credentials are missing."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
tibber_mock = _mock_tibber()
with patch("tibber.Tibber", return_value=tibber_mock):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_ACCESS_TOKEN: "valid"}
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "missing_credentials"
@pytest.mark.usefixtures("setup_credentials", "current_request_with_host")
async def test_data_api_extra_authorize_scope(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""Ensure the OAuth implementation requests Tibber scopes."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
tibber_mock = _mock_tibber()
with patch("tibber.Tibber", return_value=tibber_mock):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_ACCESS_TOKEN: "valid"}
)
handler = hass.config_entries.flow._progress[result["flow_id"]]
assert handler.extra_authorize_data["scope"] == " ".join(DATA_API_DEFAULT_SCOPES)
@pytest.mark.usefixtures("setup_credentials", "current_request_with_host")
async def test_full_flow_success(
recorder_mock: Recorder,
hass: HomeAssistant,
hass_client_no_auth: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test configuring Tibber via GraphQL + OAuth."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
tibber_mock = _mock_tibber()
with patch("tibber.Tibber", return_value=tibber_mock):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_ACCESS_TOKEN: "graphql-token"}
)
assert result["type"] is FlowResultType.EXTERNAL_STEP
authorize_url = result["url"]
state = parse_qs(urlparse(authorize_url).query)["state"][0]
client = await hass_client_no_auth()
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
assert resp.status == HTTPStatus.OK
aioclient_mock.post(
TOKEN_URL,
json={
"access_token": "mock-access-token",
"refresh_token": "mock-refresh-token",
"token_type": "bearer",
"expires_in": 3600,
},
)
data_api_client = MagicMock()
data_api_client.get_userinfo = AsyncMock(return_value={"name": "Mock Name"})
with patch(
"homeassistant.components.tibber.config_flow.tibber_data_api.TibberDataAPI",
return_value=data_api_client,
):
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["type"] is FlowResultType.CREATE_ENTRY
data = result["data"]
assert data[CONF_TOKEN]["access_token"] == "mock-access-token"
assert data[CONF_ACCESS_TOKEN] == "graphql-token"
assert data[AUTH_IMPLEMENTATION] == DOMAIN
assert result["title"] == "Mock Name"
async def test_data_api_abort_when_already_configured(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""Ensure only a single Data API entry can be configured."""
existing_entry = MockConfigEntry(
domain=DOMAIN,
data={
AUTH_IMPLEMENTATION: DOMAIN,
CONF_TOKEN: {"access_token": "existing"},
CONF_ACCESS_TOKEN: "stored-graphql",
},
unique_id="unique_user_id",
title="Existing",
)
existing_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
tibber_mock = _mock_tibber()
with patch("tibber.Tibber", return_value=tibber_mock):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_ACCESS_TOKEN: "new-token"}
with patch("tibber.Tibber.update_info", return_value=None):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=test_data
)
assert result["type"] is FlowResultType.ABORT

View File

@@ -1,121 +0,0 @@
"""Tests for the Tibber Data API coordinator and sensors."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
import aiohttp
import pytest
import tibber
from homeassistant.components.tibber.coordinator import TibberDataAPICoordinator
from homeassistant.components.tibber.sensor import (
TibberDataAPISensor,
_setup_data_api_sensors,
)
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers.update_coordinator import UpdateFailed
from .conftest import create_mock_runtime, create_tibber_device
from tests.common import MockConfigEntry
async def test_data_api_setup_adds_entities(
hass: HomeAssistant,
data_api_entry: MockConfigEntry,
) -> None:
"""Ensure Data API sensors are created and coordinator refreshes data."""
client = MagicMock()
client.get_all_devices = AsyncMock(
return_value={"device-id": create_tibber_device(value=72.0)}
)
client.update_devices = AsyncMock(
return_value={"device-id": create_tibber_device(value=83.0)}
)
async_get_client = AsyncMock(return_value=client)
runtime = create_mock_runtime(async_get_client=async_get_client)
data_api_entry.runtime_data = runtime
data_api_entry.mock_state(hass, ConfigEntryState.SETUP_IN_PROGRESS)
coordinator = TibberDataAPICoordinator(hass, data_api_entry)
await coordinator.async_config_entry_first_refresh()
runtime.data_api_coordinator = coordinator
added_entities: list[TibberDataAPISensor] = []
def async_add_entities(entities: list[TibberDataAPISensor]) -> None:
added_entities.extend(entities)
_setup_data_api_sensors(data_api_entry, async_add_entities)
assert async_get_client.await_count == 2
client.get_all_devices.assert_awaited_once()
client.update_devices.assert_awaited_once()
assert len(added_entities) == 1
sensor = added_entities[0]
assert sensor.entity_description.key == "storage.stateOfCharge"
assert sensor.native_value == 83.0
assert sensor.available
sensor.coordinator.data = {}
sensor.coordinator.sensors_by_device = {}
assert sensor.native_value is None
assert not sensor.available
async def test_data_api_coordinator_first_refresh_failure(
hass: HomeAssistant, data_api_entry: MockConfigEntry
) -> None:
"""Ensure network failures during setup raise ConfigEntryNotReady."""
async_get_client = AsyncMock(side_effect=aiohttp.ClientError("boom"))
runtime = create_mock_runtime(async_get_client=async_get_client)
data_api_entry.runtime_data = runtime
coordinator = TibberDataAPICoordinator(hass, data_api_entry)
data_api_entry.mock_state(hass, ConfigEntryState.SETUP_IN_PROGRESS)
with pytest.raises(ConfigEntryNotReady):
await coordinator.async_config_entry_first_refresh()
assert isinstance(coordinator.last_exception, UpdateFailed)
async def test_data_api_coordinator_first_refresh_auth_failed(
hass: HomeAssistant, data_api_entry: MockConfigEntry
) -> None:
"""Ensure auth failures during setup propagate."""
async_get_client = AsyncMock(side_effect=ConfigEntryAuthFailed("invalid"))
runtime = create_mock_runtime(async_get_client=async_get_client)
data_api_entry.runtime_data = runtime
coordinator = TibberDataAPICoordinator(hass, data_api_entry)
data_api_entry.mock_state(hass, ConfigEntryState.SETUP_IN_PROGRESS)
with pytest.raises(ConfigEntryAuthFailed):
await coordinator.async_config_entry_first_refresh()
@pytest.mark.parametrize(
"exception",
[
aiohttp.ClientError("err"),
TimeoutError(),
tibber.UserAgentMissingError("err"),
],
)
async def test_data_api_coordinator_update_failures(
hass: HomeAssistant, data_api_entry: MockConfigEntry, exception: Exception
) -> None:
"""Ensure update failures are wrapped in UpdateFailed."""
async_get_client = AsyncMock(side_effect=exception)
runtime = create_mock_runtime(async_get_client=async_get_client)
data_api_entry.runtime_data = runtime
coordinator = TibberDataAPICoordinator(hass, data_api_entry)
with pytest.raises(UpdateFailed):
await coordinator._async_update_data()

View File

@@ -1,23 +1,13 @@
"""Test the Tibber diagnostics."""
"""Test the Netatmo diagnostics."""
from unittest.mock import MagicMock
import aiohttp
import pytest
from syrupy.assertion import SnapshotAssertion
import tibber
from unittest.mock import patch
from homeassistant.components.recorder import Recorder
from homeassistant.components.tibber.diagnostics import (
async_get_config_entry_diagnostics,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.setup import async_setup_component
from .conftest import create_mock_runtime, create_tibber_device
from .test_common import mock_get_homes
from tests.common import MockConfigEntry
from tests.components.diagnostics import get_diagnostics_for_config_entry
from tests.typing import ClientSessionGenerator
@@ -26,93 +16,41 @@ async def test_entry_diagnostics(
recorder_mock: Recorder,
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
config_entry: MockConfigEntry,
mock_tibber_setup: MagicMock,
snapshot: SnapshotAssertion,
config_entry,
) -> None:
"""Test config entry diagnostics."""
tibber_mock = mock_tibber_setup
tibber_mock.get_homes.return_value = []
with patch(
"tibber.Tibber.update_info",
return_value=None,
):
assert await async_setup_component(hass, "tibber", {})
config_entry.runtime_data.data_api_coordinator.data = {}
config_entry.runtime_data.data_api_coordinator.sensors_by_device = {}
await hass.async_block_till_done()
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
assert result == snapshot(name="empty")
with patch(
"tibber.Tibber.get_homes",
return_value=[],
):
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
tibber_mock.get_homes.side_effect = mock_get_homes
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
assert result == snapshot(name="with_homes")
async def test_data_api_diagnostics_no_data(
recorder_mock: Recorder,
hass: HomeAssistant,
data_api_entry: MockConfigEntry,
snapshot: SnapshotAssertion,
) -> None:
"""Test Data API diagnostics when coordinator has no data."""
data_api_entry.runtime_data = create_mock_runtime()
result = await async_get_config_entry_diagnostics(hass, data_api_entry)
assert result == snapshot
async def test_data_api_diagnostics_with_devices(
recorder_mock: Recorder,
hass: HomeAssistant,
data_api_entry: MockConfigEntry,
snapshot: SnapshotAssertion,
) -> None:
"""Test Data API diagnostics with successful device retrieval."""
devices = {
"device-1": create_tibber_device(
device_id="device-1",
name="Device 1",
brand="Tibber",
model="Test Model",
),
"device-2": create_tibber_device(
device_id="device-2",
name="Device 2",
brand="Tibber",
model="Test Model",
),
assert result == {
"homes": [],
}
data_api_entry.runtime_data = create_mock_runtime(coordinator_data=devices)
with patch(
"tibber.Tibber.get_homes",
side_effect=mock_get_homes,
):
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
result = await async_get_config_entry_diagnostics(hass, data_api_entry)
assert result == snapshot
@pytest.mark.parametrize(
("exception", "expected_error"),
[
(ConfigEntryAuthFailed("Auth failed"), "Authentication failed"),
(TimeoutError(), "Timeout error"),
(aiohttp.ClientError("Connection error"), "Client error"),
(tibber.InvalidLoginError(401), "Invalid login"),
(tibber.RetryableHttpExceptionError(503), "Retryable HTTP error (503)"),
(tibber.FatalHttpExceptionError(404), "Fatal HTTP error (404)"),
],
)
async def test_data_api_diagnostics_exceptions(
recorder_mock: Recorder,
hass: HomeAssistant,
data_api_entry: MockConfigEntry,
exception: Exception,
expected_error: str,
) -> None:
"""Test Data API diagnostics with various exception scenarios."""
runtime = create_mock_runtime()
type(runtime.data_api_coordinator).data = property(
lambda self: (_ for _ in ()).throw(exception)
)
data_api_entry.runtime_data = runtime
result = await async_get_config_entry_diagnostics(hass, data_api_entry)
assert result["error"] == expected_error
assert result["devices"] == []
assert result == {
"homes": [
{
"last_data_timestamp": "2016-01-01T12:48:57",
"has_active_subscription": True,
"has_real_time_consumption": False,
"last_cons_data_timestamp": "2016-01-01T12:44:57",
"country": "NO",
}
],
}

View File

@@ -1,17 +1,11 @@
"""Test loading of the Tibber config entry."""
from unittest.mock import ANY, AsyncMock, MagicMock, patch
import pytest
from unittest.mock import MagicMock
from homeassistant.components.recorder import Recorder
from homeassistant.components.tibber import DOMAIN, TibberRuntimeData, async_setup_entry
from homeassistant.components.tibber import DOMAIN
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from tests.common import MockConfigEntry
async def test_entry_unload(
@@ -25,69 +19,3 @@ async def test_entry_unload(
mock_tibber_setup.rt_disconnect.assert_called_once()
await hass.async_block_till_done(wait_background_tasks=True)
assert entry.state is ConfigEntryState.NOT_LOADED
@pytest.mark.usefixtures("recorder_mock")
async def test_data_api_runtime_creates_client(hass: HomeAssistant) -> None:
"""Ensure the data API runtime creates and caches the client."""
session = MagicMock()
session.async_ensure_token_valid = AsyncMock()
session.token = {CONF_ACCESS_TOKEN: "access-token"}
runtime = TibberRuntimeData(
session=session,
tibber_connection=MagicMock(),
)
with patch(
"homeassistant.components.tibber.tibber_data_api.TibberDataAPI"
) as mock_client_cls:
mock_client = MagicMock()
mock_client.set_access_token = MagicMock()
mock_client_cls.return_value = mock_client
client = await runtime.async_get_client(hass)
mock_client_cls.assert_called_once_with("access-token", websession=ANY)
session.async_ensure_token_valid.assert_awaited_once()
mock_client.set_access_token.assert_called_once_with("access-token")
assert client is mock_client
mock_client.set_access_token.reset_mock()
session.async_ensure_token_valid.reset_mock()
cached_client = await runtime.async_get_client(hass)
mock_client_cls.assert_called_once()
session.async_ensure_token_valid.assert_awaited_once()
mock_client.set_access_token.assert_called_once_with("access-token")
assert cached_client is client
@pytest.mark.usefixtures("recorder_mock")
async def test_data_api_runtime_missing_token_raises(hass: HomeAssistant) -> None:
"""Ensure missing tokens trigger reauthentication."""
session = MagicMock()
session.async_ensure_token_valid = AsyncMock()
session.token = {}
runtime = TibberRuntimeData(
session=session,
tibber_connection=MagicMock(),
)
with pytest.raises(ConfigEntryAuthFailed):
await runtime.async_get_client(hass)
session.async_ensure_token_valid.assert_awaited_once()
async def test_setup_requires_data_api_reauth(hass: HomeAssistant) -> None:
"""Ensure legacy entries trigger reauth to configure Data API."""
entry = MockConfigEntry(
domain=DOMAIN,
data={CONF_ACCESS_TOKEN: "legacy-token"},
unique_id="legacy",
)
with pytest.raises(ConfigEntryAuthFailed):
await async_setup_entry(hass, entry)