mirror of
https://github.com/home-assistant/core.git
synced 2025-12-19 14:28:19 +00:00
Compare commits
7 Commits
tibber_dat
...
improve_cl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
897e470ff8 | ||
|
|
6cc7d83def | ||
|
|
5154418051 | ||
|
|
7e63c12b95 | ||
|
|
d17e951591 | ||
|
|
9198e5f56d | ||
|
|
97d7e0e01e |
2
.github/workflows/builder.yml
vendored
2
.github/workflows/builder.yml
vendored
@@ -551,7 +551,7 @@ jobs:
|
||||
|
||||
- name: Generate artifact attestation
|
||||
if: needs.init.outputs.channel != 'dev' && needs.init.outputs.publish == 'true'
|
||||
uses: actions/attest-build-provenance@977bb373ede98d70efdf65b84cb5f73e068dcc2a # v3.0.0
|
||||
uses: actions/attest-build-provenance@00014ed6ed5efc5b1ab7f7f34a39eb55d41aa4f8 # v3.1.0
|
||||
with:
|
||||
subject-name: ${{ env.HASSFEST_IMAGE_NAME }}
|
||||
subject-digest: ${{ steps.push.outputs.digest }}
|
||||
|
||||
@@ -45,7 +45,7 @@ def make_entity_state_trigger_required_features(
|
||||
"""Trigger for entity state changes."""
|
||||
|
||||
_domain = domain
|
||||
_to_state = to_state
|
||||
_to_states = {to_state}
|
||||
_required_features = required_features
|
||||
|
||||
return CustomTrigger
|
||||
|
||||
@@ -47,7 +47,7 @@ def make_binary_sensor_trigger(
|
||||
"""Trigger for entity state changes."""
|
||||
|
||||
_device_class = device_class
|
||||
_to_state = to_state
|
||||
_to_states = {to_state}
|
||||
|
||||
return CustomTrigger
|
||||
|
||||
|
||||
@@ -98,6 +98,9 @@
|
||||
}
|
||||
},
|
||||
"triggers": {
|
||||
"hvac_mode_changed": {
|
||||
"trigger": "mdi:thermostat"
|
||||
},
|
||||
"started_cooling": {
|
||||
"trigger": "mdi:snowflake"
|
||||
},
|
||||
|
||||
@@ -298,6 +298,20 @@
|
||||
},
|
||||
"title": "Climate",
|
||||
"triggers": {
|
||||
"hvac_mode_changed": {
|
||||
"description": "Triggers after the mode of one or more climate-control devices changes.",
|
||||
"fields": {
|
||||
"behavior": {
|
||||
"description": "[%key:component::climate::common::trigger_behavior_description%]",
|
||||
"name": "[%key:component::climate::common::trigger_behavior_name%]"
|
||||
},
|
||||
"hvac_mode": {
|
||||
"description": "The HVAC modes to trigger on.",
|
||||
"name": "Modes"
|
||||
}
|
||||
},
|
||||
"name": "Climate-control device mode changed"
|
||||
},
|
||||
"started_cooling": {
|
||||
"description": "Triggers after one or more climate-control devices start cooling.",
|
||||
"fields": {
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
"""Provides triggers for climates."""
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.const import CONF_OPTIONS
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers.trigger import (
|
||||
ENTITY_STATE_TRIGGER_SCHEMA_FIRST_LAST,
|
||||
EntityTargetStateTriggerBase,
|
||||
Trigger,
|
||||
TriggerConfig,
|
||||
make_entity_target_state_attribute_trigger,
|
||||
make_entity_target_state_trigger,
|
||||
make_entity_transition_trigger,
|
||||
@@ -10,7 +17,33 @@ from homeassistant.helpers.trigger import (
|
||||
|
||||
from .const import ATTR_HVAC_ACTION, DOMAIN, HVACAction, HVACMode
|
||||
|
||||
CONF_HVAC_MODE = "hvac_mode"
|
||||
|
||||
HVAC_MODE_CHANGED_TRIGGER_SCHEMA = ENTITY_STATE_TRIGGER_SCHEMA_FIRST_LAST.extend(
|
||||
{
|
||||
vol.Required(CONF_OPTIONS): {
|
||||
vol.Required(CONF_HVAC_MODE): vol.All(
|
||||
cv.ensure_list, vol.Length(min=1), [HVACMode]
|
||||
),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class HVACModeChangedTrigger(EntityTargetStateTriggerBase):
|
||||
"""Trigger for entity state changes."""
|
||||
|
||||
_domain = DOMAIN
|
||||
_schema = HVAC_MODE_CHANGED_TRIGGER_SCHEMA
|
||||
|
||||
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
|
||||
"""Initialize the state trigger."""
|
||||
super().__init__(hass, config)
|
||||
self._to_states = set(self._options[CONF_HVAC_MODE])
|
||||
|
||||
|
||||
TRIGGERS: dict[str, type[Trigger]] = {
|
||||
"hvac_mode_changed": HVACModeChangedTrigger,
|
||||
"started_cooling": make_entity_target_state_attribute_trigger(
|
||||
DOMAIN, ATTR_HVAC_ACTION, HVACAction.COOLING
|
||||
),
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
.trigger_common: &trigger_common
|
||||
target:
|
||||
target: &trigger_climate_target
|
||||
entity:
|
||||
domain: climate
|
||||
fields:
|
||||
behavior:
|
||||
behavior: &trigger_behavior
|
||||
required: true
|
||||
default: any
|
||||
selector:
|
||||
@@ -19,3 +19,18 @@ started_drying: *trigger_common
|
||||
started_heating: *trigger_common
|
||||
turned_off: *trigger_common
|
||||
turned_on: *trigger_common
|
||||
|
||||
hvac_mode_changed:
|
||||
target: *trigger_climate_target
|
||||
fields:
|
||||
behavior: *trigger_behavior
|
||||
hvac_mode:
|
||||
context:
|
||||
filter_target: target
|
||||
required: true
|
||||
selector:
|
||||
state:
|
||||
# Note: This should allow selecting multiple modes, but state selector does not support that yet.
|
||||
hide_states:
|
||||
- unavailable
|
||||
- unknown
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
{ "registered_devices": true }
|
||||
],
|
||||
"documentation": "https://www.home-assistant.io/integrations/incomfort",
|
||||
"integration_type": "hub",
|
||||
"iot_class": "local_polling",
|
||||
"loggers": ["incomfortclient"],
|
||||
"quality_scale": "platinum",
|
||||
|
||||
@@ -39,19 +39,6 @@ class LaMarzoccoSwitchEntityDescription(
|
||||
|
||||
|
||||
ENTITIES: tuple[LaMarzoccoSwitchEntityDescription, ...] = (
|
||||
LaMarzoccoSwitchEntityDescription(
|
||||
key="main",
|
||||
translation_key="main",
|
||||
name=None,
|
||||
control_fn=lambda machine, state: machine.set_power(state),
|
||||
is_on_fn=(
|
||||
lambda machine: cast(
|
||||
MachineStatus, machine.dashboard.config[WidgetType.CM_MACHINE_STATUS]
|
||||
).mode
|
||||
is MachineMode.BREWING_MODE
|
||||
),
|
||||
bt_offline_mode=True,
|
||||
),
|
||||
LaMarzoccoSwitchEntityDescription(
|
||||
key="steam_boiler_enable",
|
||||
translation_key="steam_boiler",
|
||||
@@ -98,6 +85,20 @@ ENTITIES: tuple[LaMarzoccoSwitchEntityDescription, ...] = (
|
||||
),
|
||||
)
|
||||
|
||||
MAIN_SWITCH_ENTITY = LaMarzoccoSwitchEntityDescription(
|
||||
key="main",
|
||||
translation_key="main",
|
||||
name=None,
|
||||
control_fn=lambda machine, state: machine.set_power(state),
|
||||
is_on_fn=(
|
||||
lambda machine: cast(
|
||||
MachineStatus, machine.dashboard.config[WidgetType.CM_MACHINE_STATUS]
|
||||
).mode
|
||||
is MachineMode.BREWING_MODE
|
||||
),
|
||||
bt_offline_mode=True,
|
||||
)
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant,
|
||||
@@ -107,12 +108,11 @@ async def async_setup_entry(
|
||||
"""Set up switch entities and services."""
|
||||
|
||||
coordinator = entry.runtime_data.config_coordinator
|
||||
bluetooth_coordinator = entry.runtime_data.bluetooth_coordinator
|
||||
|
||||
entities: list[SwitchEntity] = []
|
||||
entities.extend(
|
||||
LaMarzoccoSwitchEntity(
|
||||
coordinator, description, entry.runtime_data.bluetooth_coordinator
|
||||
)
|
||||
LaMarzoccoSwitchEntity(coordinator, description, bluetooth_coordinator)
|
||||
for description in ENTITIES
|
||||
if description.supported_fn(coordinator)
|
||||
)
|
||||
@@ -122,6 +122,12 @@ async def async_setup_entry(
|
||||
for wake_up_sleep_entry in coordinator.device.schedule.smart_wake_up_sleep.schedules
|
||||
)
|
||||
|
||||
entities.append(
|
||||
LaMarzoccoMainSwitchEntity(
|
||||
coordinator, MAIN_SWITCH_ENTITY, bluetooth_coordinator
|
||||
)
|
||||
)
|
||||
|
||||
async_add_entities(entities)
|
||||
|
||||
|
||||
@@ -160,6 +166,17 @@ class LaMarzoccoSwitchEntity(LaMarzoccoEntity, SwitchEntity):
|
||||
return self.entity_description.is_on_fn(self.coordinator.device)
|
||||
|
||||
|
||||
class LaMarzoccoMainSwitchEntity(LaMarzoccoSwitchEntity):
|
||||
"""Switch representing espresso machine main power."""
|
||||
|
||||
@property
|
||||
def entity_picture(self) -> str | None:
|
||||
"""Return the entity picture."""
|
||||
|
||||
image_url = self.coordinator.device.dashboard.image_url
|
||||
return image_url if image_url else None # image URL can be empty string
|
||||
|
||||
|
||||
class LaMarzoccoAutoOnOffSwitchEntity(LaMarzoccoBaseEntity, SwitchEntity):
|
||||
"""Switch representing espresso machine auto on/off."""
|
||||
|
||||
|
||||
@@ -42,6 +42,9 @@
|
||||
"number": {
|
||||
"cook_time": {
|
||||
"default": "mdi:microwave"
|
||||
},
|
||||
"speaker_setpoint": {
|
||||
"default": "mdi:speaker"
|
||||
}
|
||||
},
|
||||
"select": {
|
||||
|
||||
@@ -365,6 +365,31 @@ DISCOVERY_SCHEMAS = [
|
||||
clusters.MicrowaveOvenControl.Attributes.MaxCookTime,
|
||||
),
|
||||
),
|
||||
MatterDiscoverySchema(
|
||||
platform=Platform.NUMBER,
|
||||
entity_description=MatterRangeNumberEntityDescription(
|
||||
key="speaker_setpoint",
|
||||
translation_key="speaker_setpoint",
|
||||
native_unit_of_measurement=PERCENTAGE,
|
||||
command=lambda value: clusters.LevelControl.Commands.MoveToLevel(
|
||||
level=int(value)
|
||||
),
|
||||
native_min_value=0,
|
||||
native_max_value=100,
|
||||
native_step=1,
|
||||
device_to_ha=lambda x: None if x is None else x,
|
||||
min_attribute=clusters.LevelControl.Attributes.MinLevel,
|
||||
max_attribute=clusters.LevelControl.Attributes.MaxLevel,
|
||||
mode=NumberMode.SLIDER,
|
||||
),
|
||||
entity_class=MatterRangeNumber,
|
||||
required_attributes=(
|
||||
clusters.LevelControl.Attributes.CurrentLevel,
|
||||
clusters.LevelControl.Attributes.MinLevel,
|
||||
clusters.LevelControl.Attributes.MaxLevel,
|
||||
),
|
||||
device_type=(device_types.Speaker,),
|
||||
),
|
||||
MatterDiscoverySchema(
|
||||
platform=Platform.NUMBER,
|
||||
entity_description=MatterNumberEntityDescription(
|
||||
|
||||
@@ -232,6 +232,9 @@
|
||||
"pump_setpoint": {
|
||||
"name": "Setpoint"
|
||||
},
|
||||
"speaker_setpoint": {
|
||||
"name": "Volume"
|
||||
},
|
||||
"temperature_offset": {
|
||||
"name": "Temperature offset"
|
||||
},
|
||||
|
||||
@@ -1,36 +1,20 @@
|
||||
"""Support for Tibber."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
import logging
|
||||
|
||||
import aiohttp
|
||||
from aiohttp.client_exceptions import ClientError, ClientResponseError
|
||||
import tibber
|
||||
from tibber import data_api as tibber_data_api
|
||||
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN, EVENT_HOMEASSISTANT_STOP, Platform
|
||||
from homeassistant.core import Event, HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
|
||||
from homeassistant.exceptions import ConfigEntryNotReady
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.helpers.config_entry_oauth2_flow import (
|
||||
ImplementationUnavailableError,
|
||||
OAuth2Session,
|
||||
async_get_config_entry_implementation,
|
||||
)
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
from homeassistant.util import dt as dt_util, ssl as ssl_util
|
||||
|
||||
from .const import (
|
||||
AUTH_IMPLEMENTATION,
|
||||
CONF_LEGACY_ACCESS_TOKEN,
|
||||
DATA_HASS_CONFIG,
|
||||
DOMAIN,
|
||||
TibberConfigEntry,
|
||||
)
|
||||
from .coordinator import TibberDataAPICoordinator
|
||||
from .const import DATA_HASS_CONFIG, DOMAIN
|
||||
from .services import async_setup_services
|
||||
|
||||
PLATFORMS = [Platform.NOTIFY, Platform.SENSOR]
|
||||
@@ -40,33 +24,6 @@ CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TibberRuntimeData:
|
||||
"""Runtime data for Tibber API entries."""
|
||||
|
||||
tibber_connection: tibber.Tibber
|
||||
session: OAuth2Session
|
||||
data_api_coordinator: TibberDataAPICoordinator | None = field(default=None)
|
||||
_client: tibber_data_api.TibberDataAPI | None = None
|
||||
|
||||
async def async_get_client(
|
||||
self, hass: HomeAssistant
|
||||
) -> tibber_data_api.TibberDataAPI:
|
||||
"""Return an authenticated Tibber Data API client."""
|
||||
await self.session.async_ensure_token_valid()
|
||||
token = self.session.token
|
||||
access_token = token.get(CONF_ACCESS_TOKEN)
|
||||
if not access_token:
|
||||
raise ConfigEntryAuthFailed("Access token missing from OAuth session")
|
||||
if self._client is None:
|
||||
self._client = tibber_data_api.TibberDataAPI(
|
||||
access_token,
|
||||
websession=async_get_clientsession(hass),
|
||||
)
|
||||
self._client.set_access_token(access_token)
|
||||
return self._client
|
||||
|
||||
|
||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Set up the Tibber component."""
|
||||
|
||||
@@ -77,23 +34,16 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: TibberConfigEntry) -> bool:
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Set up a config entry."""
|
||||
|
||||
# Added in 2026.1 to migrate existing users to OAuth2 (Tibber Data API).
|
||||
# Can be removed after 2026.5
|
||||
if AUTH_IMPLEMENTATION not in entry.data:
|
||||
raise ConfigEntryAuthFailed(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="data_api_reauth_required",
|
||||
)
|
||||
|
||||
tibber_connection = tibber.Tibber(
|
||||
access_token=entry.data[CONF_LEGACY_ACCESS_TOKEN],
|
||||
access_token=entry.data[CONF_ACCESS_TOKEN],
|
||||
websession=async_get_clientsession(hass),
|
||||
time_zone=dt_util.get_default_time_zone(),
|
||||
ssl=ssl_util.get_default_context(),
|
||||
)
|
||||
hass.data[DOMAIN] = tibber_connection
|
||||
|
||||
async def _close(event: Event) -> None:
|
||||
await tibber_connection.rt_disconnect()
|
||||
@@ -102,6 +52,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: TibberConfigEntry) -> bo
|
||||
|
||||
try:
|
||||
await tibber_connection.update_info()
|
||||
|
||||
except (
|
||||
TimeoutError,
|
||||
aiohttp.ClientError,
|
||||
@@ -114,45 +65,17 @@ async def async_setup_entry(hass: HomeAssistant, entry: TibberConfigEntry) -> bo
|
||||
except tibber.FatalHttpExceptionError:
|
||||
return False
|
||||
|
||||
try:
|
||||
implementation = await async_get_config_entry_implementation(hass, entry)
|
||||
except ImplementationUnavailableError as err:
|
||||
raise ConfigEntryNotReady(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="oauth2_implementation_unavailable",
|
||||
) from err
|
||||
|
||||
session = OAuth2Session(hass, entry, implementation)
|
||||
try:
|
||||
await session.async_ensure_token_valid()
|
||||
except ClientResponseError as err:
|
||||
if 400 <= err.status < 500:
|
||||
raise ConfigEntryAuthFailed(
|
||||
"OAuth session is not valid, reauthentication required"
|
||||
) from err
|
||||
raise ConfigEntryNotReady from err
|
||||
except ClientError as err:
|
||||
raise ConfigEntryNotReady from err
|
||||
|
||||
entry.runtime_data = TibberRuntimeData(
|
||||
tibber_connection=tibber_connection,
|
||||
session=session,
|
||||
)
|
||||
|
||||
coordinator = TibberDataAPICoordinator(hass, entry)
|
||||
await coordinator.async_config_entry_first_refresh()
|
||||
entry.runtime_data.data_api_coordinator = coordinator
|
||||
|
||||
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
async def async_unload_entry(
|
||||
hass: HomeAssistant, config_entry: TibberConfigEntry
|
||||
) -> bool:
|
||||
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
|
||||
"""Unload a config entry."""
|
||||
if unload_ok := await hass.config_entries.async_unload_platforms(
|
||||
unload_ok = await hass.config_entries.async_unload_platforms(
|
||||
config_entry, PLATFORMS
|
||||
):
|
||||
await config_entry.runtime_data.tibber_connection.rt_disconnect()
|
||||
)
|
||||
if unload_ok:
|
||||
tibber_connection = hass.data[DOMAIN]
|
||||
await tibber_connection.rt_disconnect()
|
||||
return unload_ok
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
"""Application credentials platform for Tibber."""
|
||||
|
||||
from homeassistant.components.application_credentials import AuthorizationServer
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
AUTHORIZE_URL = "https://thewall.tibber.com/connect/authorize"
|
||||
TOKEN_URL = "https://thewall.tibber.com/connect/token"
|
||||
|
||||
|
||||
async def async_get_authorization_server(hass: HomeAssistant) -> AuthorizationServer:
|
||||
"""Return authorization server for Tibber Data API."""
|
||||
return AuthorizationServer(
|
||||
authorize_url=AUTHORIZE_URL,
|
||||
token_url=TOKEN_URL,
|
||||
)
|
||||
@@ -2,164 +2,80 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
import tibber
|
||||
from tibber import data_api as tibber_data_api
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.config_entries import SOURCE_REAUTH, SOURCE_USER, ConfigFlowResult
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_TOKEN
|
||||
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.helpers.config_entry_oauth2_flow import AbstractOAuth2FlowHandler
|
||||
|
||||
from .const import CONF_LEGACY_ACCESS_TOKEN, DATA_API_DEFAULT_SCOPES, DOMAIN
|
||||
from .const import DOMAIN
|
||||
|
||||
DATA_SCHEMA = vol.Schema({vol.Required(CONF_LEGACY_ACCESS_TOKEN): str})
|
||||
DATA_SCHEMA = vol.Schema({vol.Required(CONF_ACCESS_TOKEN): str})
|
||||
ERR_TIMEOUT = "timeout"
|
||||
ERR_CLIENT = "cannot_connect"
|
||||
ERR_TOKEN = "invalid_access_token"
|
||||
TOKEN_URL = "https://developer.tibber.com/settings/access-token"
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TibberConfigFlow(AbstractOAuth2FlowHandler, domain=DOMAIN):
|
||||
class TibberConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a config flow for Tibber integration."""
|
||||
|
||||
VERSION = 1
|
||||
DOMAIN = DOMAIN
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize the config flow."""
|
||||
super().__init__()
|
||||
self._access_token: str | None = None
|
||||
self._title = ""
|
||||
|
||||
@property
|
||||
def logger(self) -> logging.Logger:
|
||||
"""Return the logger."""
|
||||
return _LOGGER
|
||||
|
||||
@property
|
||||
def extra_authorize_data(self) -> dict[str, Any]:
|
||||
"""Extra data appended to the authorize URL."""
|
||||
return {
|
||||
**super().extra_authorize_data,
|
||||
"scope": " ".join(DATA_API_DEFAULT_SCOPES),
|
||||
}
|
||||
|
||||
async def async_step_user(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle the initial step."""
|
||||
if user_input is None:
|
||||
data_schema = self.add_suggested_values_to_schema(
|
||||
DATA_SCHEMA, {CONF_LEGACY_ACCESS_TOKEN: self._access_token or ""}
|
||||
|
||||
self._async_abort_entries_match()
|
||||
|
||||
if user_input is not None:
|
||||
access_token = user_input[CONF_ACCESS_TOKEN].replace(" ", "")
|
||||
|
||||
tibber_connection = tibber.Tibber(
|
||||
access_token=access_token,
|
||||
websession=async_get_clientsession(self.hass),
|
||||
)
|
||||
|
||||
return self.async_show_form(
|
||||
step_id=SOURCE_USER,
|
||||
data_schema=data_schema,
|
||||
description_placeholders={"url": TOKEN_URL},
|
||||
errors={},
|
||||
)
|
||||
errors = {}
|
||||
|
||||
self._access_token = user_input[CONF_LEGACY_ACCESS_TOKEN].replace(" ", "")
|
||||
tibber_connection = tibber.Tibber(
|
||||
access_token=self._access_token,
|
||||
websession=async_get_clientsession(self.hass),
|
||||
)
|
||||
self._title = tibber_connection.name or "Tibber"
|
||||
try:
|
||||
await tibber_connection.update_info()
|
||||
except TimeoutError:
|
||||
errors[CONF_ACCESS_TOKEN] = ERR_TIMEOUT
|
||||
except tibber.InvalidLoginError:
|
||||
errors[CONF_ACCESS_TOKEN] = ERR_TOKEN
|
||||
except (
|
||||
aiohttp.ClientError,
|
||||
tibber.RetryableHttpExceptionError,
|
||||
tibber.FatalHttpExceptionError,
|
||||
):
|
||||
errors[CONF_ACCESS_TOKEN] = ERR_CLIENT
|
||||
|
||||
errors: dict[str, str] = {}
|
||||
try:
|
||||
await tibber_connection.update_info()
|
||||
except TimeoutError:
|
||||
errors[CONF_LEGACY_ACCESS_TOKEN] = ERR_TIMEOUT
|
||||
except tibber.InvalidLoginError:
|
||||
errors[CONF_LEGACY_ACCESS_TOKEN] = ERR_TOKEN
|
||||
except (
|
||||
aiohttp.ClientError,
|
||||
tibber.RetryableHttpExceptionError,
|
||||
tibber.FatalHttpExceptionError,
|
||||
):
|
||||
errors[CONF_LEGACY_ACCESS_TOKEN] = ERR_CLIENT
|
||||
if errors:
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=DATA_SCHEMA,
|
||||
description_placeholders={"url": TOKEN_URL},
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
if errors:
|
||||
data_schema = self.add_suggested_values_to_schema(
|
||||
DATA_SCHEMA, {CONF_LEGACY_ACCESS_TOKEN: self._access_token or ""}
|
||||
)
|
||||
|
||||
return self.async_show_form(
|
||||
step_id=SOURCE_USER,
|
||||
data_schema=data_schema,
|
||||
description_placeholders={"url": TOKEN_URL},
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
await self.async_set_unique_id(tibber_connection.user_id)
|
||||
|
||||
if self.source == SOURCE_REAUTH:
|
||||
reauth_entry = self._get_reauth_entry()
|
||||
self._abort_if_unique_id_mismatch(
|
||||
reason="wrong_account",
|
||||
description_placeholders={"title": reauth_entry.title},
|
||||
)
|
||||
else:
|
||||
unique_id = tibber_connection.user_id
|
||||
await self.async_set_unique_id(unique_id)
|
||||
self._abort_if_unique_id_configured()
|
||||
|
||||
return await self.async_step_pick_implementation()
|
||||
|
||||
async def async_step_reauth(
|
||||
self, entry_data: Mapping[str, Any]
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle a reauth flow."""
|
||||
reauth_entry = self._get_reauth_entry()
|
||||
self._access_token = reauth_entry.data.get(CONF_LEGACY_ACCESS_TOKEN)
|
||||
self._title = reauth_entry.title
|
||||
return await self.async_step_reauth_confirm()
|
||||
|
||||
async def async_step_reauth_confirm(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Confirm reauthentication by reusing the user step."""
|
||||
reauth_entry = self._get_reauth_entry()
|
||||
self._access_token = reauth_entry.data.get(CONF_LEGACY_ACCESS_TOKEN)
|
||||
self._title = reauth_entry.title
|
||||
if user_input is None:
|
||||
return self.async_show_form(
|
||||
step_id="reauth_confirm",
|
||||
return self.async_create_entry(
|
||||
title=tibber_connection.name,
|
||||
data={CONF_ACCESS_TOKEN: access_token},
|
||||
)
|
||||
return await self.async_step_user()
|
||||
|
||||
async def async_oauth_create_entry(self, data: dict) -> ConfigFlowResult:
|
||||
"""Finalize the OAuth flow and create the config entry."""
|
||||
if self._access_token is None:
|
||||
return self.async_abort(reason="missing_configuration")
|
||||
|
||||
data[CONF_LEGACY_ACCESS_TOKEN] = self._access_token
|
||||
|
||||
access_token = data[CONF_TOKEN][CONF_ACCESS_TOKEN]
|
||||
data_api_client = tibber_data_api.TibberDataAPI(
|
||||
access_token,
|
||||
websession=async_get_clientsession(self.hass),
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=DATA_SCHEMA,
|
||||
description_placeholders={"url": TOKEN_URL},
|
||||
errors={},
|
||||
)
|
||||
|
||||
try:
|
||||
await data_api_client.get_userinfo()
|
||||
except (aiohttp.ClientError, TimeoutError):
|
||||
return self.async_abort(reason="cannot_connect")
|
||||
|
||||
if self.source == SOURCE_REAUTH:
|
||||
reauth_entry = self._get_reauth_entry()
|
||||
return self.async_update_reload_and_abort(
|
||||
reauth_entry,
|
||||
data=data,
|
||||
title=self._title,
|
||||
)
|
||||
|
||||
return self.async_create_entry(title=self._title, data=data)
|
||||
|
||||
@@ -1,34 +1,5 @@
|
||||
"""Constants for Tibber integration."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from . import TibberRuntimeData
|
||||
|
||||
type TibberConfigEntry = ConfigEntry[TibberRuntimeData]
|
||||
|
||||
|
||||
CONF_LEGACY_ACCESS_TOKEN = CONF_ACCESS_TOKEN
|
||||
|
||||
AUTH_IMPLEMENTATION = "auth_implementation"
|
||||
DATA_HASS_CONFIG = "tibber_hass_config"
|
||||
DOMAIN = "tibber"
|
||||
MANUFACTURER = "Tibber"
|
||||
DATA_API_DEFAULT_SCOPES = [
|
||||
"openid",
|
||||
"profile",
|
||||
"email",
|
||||
"offline_access",
|
||||
"data-api-user-read",
|
||||
"data-api-chargers-read",
|
||||
"data-api-energy-systems-read",
|
||||
"data-api-homes-read",
|
||||
"data-api-thermostats-read",
|
||||
"data-api-vehicles-read",
|
||||
"data-api-inverters-read",
|
||||
]
|
||||
|
||||
@@ -4,11 +4,9 @@ from __future__ import annotations
|
||||
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, cast
|
||||
from typing import cast
|
||||
|
||||
from aiohttp.client_exceptions import ClientError
|
||||
import tibber
|
||||
from tibber.data_api import TibberDataAPI, TibberDevice
|
||||
|
||||
from homeassistant.components.recorder import get_instance
|
||||
from homeassistant.components.recorder.models import (
|
||||
@@ -21,18 +19,15 @@ from homeassistant.components.recorder.statistics import (
|
||||
get_last_statistics,
|
||||
statistics_during_period,
|
||||
)
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import UnitOfEnergy
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||
from homeassistant.util import dt as dt_util
|
||||
from homeassistant.util.unit_conversion import EnergyConverter
|
||||
|
||||
from .const import DOMAIN
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .const import TibberConfigEntry
|
||||
|
||||
FIVE_YEARS = 5 * 365 * 24
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@@ -41,12 +36,12 @@ _LOGGER = logging.getLogger(__name__)
|
||||
class TibberDataCoordinator(DataUpdateCoordinator[None]):
|
||||
"""Handle Tibber data and insert statistics."""
|
||||
|
||||
config_entry: TibberConfigEntry
|
||||
config_entry: ConfigEntry
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
config_entry: TibberConfigEntry,
|
||||
config_entry: ConfigEntry,
|
||||
tibber_connection: tibber.Tibber,
|
||||
) -> None:
|
||||
"""Initialize the data handler."""
|
||||
@@ -192,64 +187,3 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
|
||||
unit_of_measurement=unit,
|
||||
)
|
||||
async_add_external_statistics(self.hass, metadata, statistics)
|
||||
|
||||
|
||||
class TibberDataAPICoordinator(DataUpdateCoordinator[dict[str, TibberDevice]]):
|
||||
"""Fetch and cache Tibber Data API device capabilities."""
|
||||
|
||||
config_entry: TibberConfigEntry
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
entry: TibberConfigEntry,
|
||||
) -> None:
|
||||
"""Initialize the coordinator."""
|
||||
super().__init__(
|
||||
hass,
|
||||
_LOGGER,
|
||||
name=f"{DOMAIN} Data API",
|
||||
update_interval=timedelta(minutes=1),
|
||||
config_entry=entry,
|
||||
)
|
||||
self._runtime_data = entry.runtime_data
|
||||
self.sensors_by_device: dict[str, dict[str, tibber.data_api.Sensor]] = {}
|
||||
|
||||
def _build_sensor_lookup(self, devices: dict[str, TibberDevice]) -> None:
|
||||
"""Build sensor lookup dict for efficient access."""
|
||||
self.sensors_by_device = {
|
||||
device_id: {sensor.id: sensor for sensor in device.sensors}
|
||||
for device_id, device in devices.items()
|
||||
}
|
||||
|
||||
def get_sensor(
|
||||
self, device_id: str, sensor_id: str
|
||||
) -> tibber.data_api.Sensor | None:
|
||||
"""Get a sensor by device and sensor ID."""
|
||||
if device_sensors := self.sensors_by_device.get(device_id):
|
||||
return device_sensors.get(sensor_id)
|
||||
return None
|
||||
|
||||
async def _async_get_client(self) -> TibberDataAPI:
|
||||
"""Get the Tibber Data API client with error handling."""
|
||||
try:
|
||||
return await self._runtime_data.async_get_client(self.hass)
|
||||
except ConfigEntryAuthFailed:
|
||||
raise
|
||||
except (ClientError, TimeoutError, tibber.UserAgentMissingError) as err:
|
||||
raise UpdateFailed(
|
||||
f"Unable to create Tibber Data API client: {err}"
|
||||
) from err
|
||||
|
||||
async def _async_setup(self) -> None:
|
||||
"""Initial load of Tibber Data API devices."""
|
||||
client = await self._async_get_client()
|
||||
devices = await client.get_all_devices()
|
||||
self._build_sensor_lookup(devices)
|
||||
|
||||
async def _async_update_data(self) -> dict[str, TibberDevice]:
|
||||
"""Fetch the latest device capabilities from the Tibber Data API."""
|
||||
client = await self._async_get_client()
|
||||
devices: dict[str, TibberDevice] = await client.update_devices()
|
||||
self._build_sensor_lookup(devices)
|
||||
return devices
|
||||
|
||||
@@ -4,22 +4,21 @@ from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
import tibber
|
||||
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
|
||||
from .const import TibberConfigEntry
|
||||
from .const import DOMAIN
|
||||
|
||||
|
||||
async def async_get_config_entry_diagnostics(
|
||||
hass: HomeAssistant, config_entry: TibberConfigEntry
|
||||
hass: HomeAssistant, config_entry: ConfigEntry
|
||||
) -> dict[str, Any]:
|
||||
"""Return diagnostics for a config entry."""
|
||||
tibber_connection: tibber.Tibber = hass.data[DOMAIN]
|
||||
|
||||
runtime = config_entry.runtime_data
|
||||
result: dict[str, Any] = {
|
||||
return {
|
||||
"homes": [
|
||||
{
|
||||
"last_data_timestamp": home.last_data_timestamp,
|
||||
@@ -28,38 +27,6 @@ async def async_get_config_entry_diagnostics(
|
||||
"last_cons_data_timestamp": home.last_cons_data_timestamp,
|
||||
"country": home.country,
|
||||
}
|
||||
for home in runtime.tibber_connection.get_homes(only_active=False)
|
||||
for home in tibber_connection.get_homes(only_active=False)
|
||||
]
|
||||
}
|
||||
|
||||
devices: dict[str, Any] = {}
|
||||
error: str | None = None
|
||||
try:
|
||||
coordinator = runtime.data_api_coordinator
|
||||
if coordinator is not None:
|
||||
devices = coordinator.data
|
||||
except ConfigEntryAuthFailed:
|
||||
error = "Authentication failed"
|
||||
except TimeoutError:
|
||||
error = "Timeout error"
|
||||
except aiohttp.ClientError:
|
||||
error = "Client error"
|
||||
except tibber.InvalidLoginError:
|
||||
error = "Invalid login"
|
||||
except tibber.RetryableHttpExceptionError as err:
|
||||
error = f"Retryable HTTP error ({err.status})"
|
||||
except tibber.FatalHttpExceptionError as err:
|
||||
error = f"Fatal HTTP error ({err.status})"
|
||||
|
||||
result["error"] = error
|
||||
result["devices"] = [
|
||||
{
|
||||
"id": device.id,
|
||||
"name": device.name,
|
||||
"brand": device.brand,
|
||||
"model": device.model,
|
||||
}
|
||||
for device in devices.values()
|
||||
]
|
||||
|
||||
return result
|
||||
|
||||
@@ -3,9 +3,9 @@
|
||||
"name": "Tibber",
|
||||
"codeowners": ["@danielhiversen"],
|
||||
"config_flow": true,
|
||||
"dependencies": ["application_credentials", "recorder"],
|
||||
"dependencies": ["recorder"],
|
||||
"documentation": "https://www.home-assistant.io/integrations/tibber",
|
||||
"iot_class": "cloud_polling",
|
||||
"loggers": ["tibber"],
|
||||
"requirements": ["pyTibber==0.33.1"]
|
||||
"requirements": ["pyTibber==0.32.2"]
|
||||
}
|
||||
|
||||
@@ -2,25 +2,28 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from tibber import Tibber
|
||||
|
||||
from homeassistant.components.notify import (
|
||||
ATTR_TITLE_DEFAULT,
|
||||
NotifyEntity,
|
||||
NotifyEntityFeature,
|
||||
)
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
|
||||
|
||||
from .const import DOMAIN, TibberConfigEntry
|
||||
from . import DOMAIN
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant,
|
||||
entry: TibberConfigEntry,
|
||||
entry: ConfigEntry,
|
||||
async_add_entities: AddConfigEntryEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up the Tibber notification entity."""
|
||||
async_add_entities([TibberNotificationEntity(entry)])
|
||||
async_add_entities([TibberNotificationEntity(entry.entry_id)])
|
||||
|
||||
|
||||
class TibberNotificationEntity(NotifyEntity):
|
||||
@@ -30,14 +33,13 @@ class TibberNotificationEntity(NotifyEntity):
|
||||
_attr_name = DOMAIN
|
||||
_attr_icon = "mdi:message-flash"
|
||||
|
||||
def __init__(self, entry: TibberConfigEntry) -> None:
|
||||
def __init__(self, unique_id: str) -> None:
|
||||
"""Initialize Tibber notify entity."""
|
||||
self._attr_unique_id = entry.entry_id
|
||||
self._entry = entry
|
||||
self._attr_unique_id = unique_id
|
||||
|
||||
async def async_send_message(self, message: str, title: str | None = None) -> None:
|
||||
"""Send a message to Tibber devices."""
|
||||
tibber_connection = self._entry.runtime_data.tibber_connection
|
||||
tibber_connection: Tibber = self.hass.data[DOMAIN]
|
||||
try:
|
||||
await tibber_connection.send_notification(
|
||||
title or ATTR_TITLE_DEFAULT, message
|
||||
|
||||
@@ -10,8 +10,7 @@ from random import randrange
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
from tibber import FatalHttpExceptionError, RetryableHttpExceptionError, TibberHome
|
||||
from tibber.data_api import TibberDevice
|
||||
import tibber
|
||||
|
||||
from homeassistant.components.sensor import (
|
||||
SensorDeviceClass,
|
||||
@@ -28,7 +27,6 @@ from homeassistant.const import (
|
||||
UnitOfElectricCurrent,
|
||||
UnitOfElectricPotential,
|
||||
UnitOfEnergy,
|
||||
UnitOfLength,
|
||||
UnitOfPower,
|
||||
)
|
||||
from homeassistant.core import Event, HomeAssistant, callback
|
||||
@@ -43,8 +41,8 @@ from homeassistant.helpers.update_coordinator import (
|
||||
)
|
||||
from homeassistant.util import Throttle, dt as dt_util
|
||||
|
||||
from .const import DOMAIN, MANUFACTURER, TibberConfigEntry
|
||||
from .coordinator import TibberDataAPICoordinator, TibberDataCoordinator
|
||||
from .const import DOMAIN, MANUFACTURER
|
||||
from .coordinator import TibberDataCoordinator
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@@ -262,65 +260,14 @@ SENSORS: tuple[SensorEntityDescription, ...] = (
|
||||
)
|
||||
|
||||
|
||||
DATA_API_SENSORS: tuple[SensorEntityDescription, ...] = (
|
||||
SensorEntityDescription(
|
||||
key="storage.stateOfCharge",
|
||||
translation_key="storage_state_of_charge",
|
||||
device_class=SensorDeviceClass.BATTERY,
|
||||
native_unit_of_measurement=PERCENTAGE,
|
||||
state_class=SensorStateClass.MEASUREMENT,
|
||||
),
|
||||
SensorEntityDescription(
|
||||
key="storage.targetStateOfCharge",
|
||||
translation_key="storage_target_state_of_charge",
|
||||
device_class=SensorDeviceClass.BATTERY,
|
||||
native_unit_of_measurement=PERCENTAGE,
|
||||
state_class=SensorStateClass.MEASUREMENT,
|
||||
),
|
||||
SensorEntityDescription(
|
||||
key="range.remaining",
|
||||
translation_key="range_remaining",
|
||||
device_class=SensorDeviceClass.DISTANCE,
|
||||
native_unit_of_measurement=UnitOfLength.KILOMETERS,
|
||||
state_class=SensorStateClass.MEASUREMENT,
|
||||
suggested_display_precision=1,
|
||||
),
|
||||
SensorEntityDescription(
|
||||
key="charging.current.max",
|
||||
translation_key="charging_current_max",
|
||||
device_class=SensorDeviceClass.CURRENT,
|
||||
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
|
||||
state_class=SensorStateClass.MEASUREMENT,
|
||||
),
|
||||
SensorEntityDescription(
|
||||
key="charging.current.offlineFallback",
|
||||
translation_key="charging_current_offline_fallback",
|
||||
device_class=SensorDeviceClass.CURRENT,
|
||||
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
|
||||
state_class=SensorStateClass.MEASUREMENT,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant,
|
||||
entry: TibberConfigEntry,
|
||||
entry: ConfigEntry,
|
||||
async_add_entities: AddConfigEntryEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up the Tibber sensor."""
|
||||
|
||||
_setup_data_api_sensors(entry, async_add_entities)
|
||||
await _async_setup_graphql_sensors(hass, entry, async_add_entities)
|
||||
|
||||
|
||||
async def _async_setup_graphql_sensors(
|
||||
hass: HomeAssistant,
|
||||
entry: TibberConfigEntry,
|
||||
async_add_entities: AddConfigEntryEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up the Tibber sensor."""
|
||||
|
||||
tibber_connection = entry.runtime_data.tibber_connection
|
||||
tibber_connection = hass.data[DOMAIN]
|
||||
|
||||
entity_registry = er.async_get(hass)
|
||||
device_registry = dr.async_get(hass)
|
||||
@@ -333,11 +280,7 @@ async def _async_setup_graphql_sensors(
|
||||
except TimeoutError as err:
|
||||
_LOGGER.error("Timeout connecting to Tibber home: %s ", err)
|
||||
raise PlatformNotReady from err
|
||||
except (
|
||||
RetryableHttpExceptionError,
|
||||
FatalHttpExceptionError,
|
||||
aiohttp.ClientError,
|
||||
) as err:
|
||||
except (tibber.RetryableHttpExceptionError, aiohttp.ClientError) as err:
|
||||
_LOGGER.error("Error connecting to Tibber home: %s ", err)
|
||||
raise PlatformNotReady from err
|
||||
|
||||
@@ -382,72 +325,7 @@ async def _async_setup_graphql_sensors(
|
||||
device_entry.id, new_identifiers={(DOMAIN, home.home_id)}
|
||||
)
|
||||
|
||||
async_add_entities(entities)
|
||||
|
||||
|
||||
def _setup_data_api_sensors(
|
||||
entry: TibberConfigEntry,
|
||||
async_add_entities: AddConfigEntryEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up sensors backed by the Tibber Data API."""
|
||||
|
||||
coordinator = entry.runtime_data.data_api_coordinator
|
||||
if coordinator is None:
|
||||
return
|
||||
|
||||
entities: list[TibberDataAPISensor] = []
|
||||
api_sensors = {sensor.key: sensor for sensor in DATA_API_SENSORS}
|
||||
|
||||
for device in coordinator.data.values():
|
||||
for sensor in device.sensors:
|
||||
description: SensorEntityDescription | None = api_sensors.get(sensor.id)
|
||||
if description is None:
|
||||
_LOGGER.debug(
|
||||
"Sensor %s not found in DATA_API_SENSORS, skipping", sensor
|
||||
)
|
||||
continue
|
||||
entities.append(
|
||||
TibberDataAPISensor(
|
||||
coordinator, device, description, sensor.description
|
||||
)
|
||||
)
|
||||
async_add_entities(entities)
|
||||
|
||||
|
||||
class TibberDataAPISensor(CoordinatorEntity[TibberDataAPICoordinator], SensorEntity):
|
||||
"""Representation of a Tibber Data API capability sensor."""
|
||||
|
||||
_attr_has_entity_name = True
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: TibberDataAPICoordinator,
|
||||
device: TibberDevice,
|
||||
entity_description: SensorEntityDescription,
|
||||
name: str,
|
||||
) -> None:
|
||||
"""Initialize the sensor."""
|
||||
super().__init__(coordinator)
|
||||
|
||||
self._device_id: str = device.id
|
||||
self.entity_description = entity_description
|
||||
self._attr_name = name
|
||||
|
||||
self._attr_unique_id = f"{device.external_id}_{self.entity_description.key}"
|
||||
|
||||
self._attr_device_info = DeviceInfo(
|
||||
identifiers={(DOMAIN, device.external_id)},
|
||||
name=device.name,
|
||||
manufacturer=device.brand,
|
||||
model=device.model,
|
||||
)
|
||||
|
||||
@property
|
||||
def native_value(self) -> StateType:
|
||||
"""Return the value reported by the device."""
|
||||
sensors = self.coordinator.sensors_by_device.get(self._device_id, {})
|
||||
sensor = sensors.get(self.entity_description.key)
|
||||
return sensor.value if sensor else None
|
||||
async_add_entities(entities, True)
|
||||
|
||||
|
||||
class TibberSensor(SensorEntity):
|
||||
@@ -455,7 +333,9 @@ class TibberSensor(SensorEntity):
|
||||
|
||||
_attr_has_entity_name = True
|
||||
|
||||
def __init__(self, *args: Any, tibber_home: TibberHome, **kwargs: Any) -> None:
|
||||
def __init__(
|
||||
self, *args: Any, tibber_home: tibber.TibberHome, **kwargs: Any
|
||||
) -> None:
|
||||
"""Initialize the sensor."""
|
||||
super().__init__(*args, **kwargs)
|
||||
self._tibber_home = tibber_home
|
||||
@@ -486,7 +366,7 @@ class TibberSensorElPrice(TibberSensor):
|
||||
_attr_state_class = SensorStateClass.MEASUREMENT
|
||||
_attr_translation_key = "electricity_price"
|
||||
|
||||
def __init__(self, tibber_home: TibberHome) -> None:
|
||||
def __init__(self, tibber_home: tibber.TibberHome) -> None:
|
||||
"""Initialize the sensor."""
|
||||
super().__init__(tibber_home=tibber_home)
|
||||
self._last_updated: datetime.datetime | None = None
|
||||
@@ -563,7 +443,7 @@ class TibberDataSensor(TibberSensor, CoordinatorEntity[TibberDataCoordinator]):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
tibber_home: TibberHome,
|
||||
tibber_home: tibber.TibberHome,
|
||||
coordinator: TibberDataCoordinator,
|
||||
entity_description: SensorEntityDescription,
|
||||
) -> None:
|
||||
@@ -590,7 +470,7 @@ class TibberSensorRT(TibberSensor, CoordinatorEntity["TibberRtDataCoordinator"])
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
tibber_home: TibberHome,
|
||||
tibber_home: tibber.TibberHome,
|
||||
description: SensorEntityDescription,
|
||||
initial_state: float,
|
||||
coordinator: TibberRtDataCoordinator,
|
||||
@@ -652,7 +532,7 @@ class TibberRtEntityCreator:
|
||||
def __init__(
|
||||
self,
|
||||
async_add_entities: AddConfigEntryEntitiesCallback,
|
||||
tibber_home: TibberHome,
|
||||
tibber_home: tibber.TibberHome,
|
||||
entity_registry: er.EntityRegistry,
|
||||
) -> None:
|
||||
"""Initialize the data handler."""
|
||||
@@ -738,7 +618,7 @@ class TibberRtDataCoordinator(DataUpdateCoordinator): # pylint: disable=hass-en
|
||||
hass: HomeAssistant,
|
||||
config_entry: ConfigEntry,
|
||||
add_sensor_callback: Callable[[TibberRtDataCoordinator, Any], None],
|
||||
tibber_home: TibberHome,
|
||||
tibber_home: tibber.TibberHome,
|
||||
) -> None:
|
||||
"""Initialize the data handler."""
|
||||
self._add_sensor_callback = add_sensor_callback
|
||||
|
||||
@@ -4,7 +4,7 @@ from __future__ import annotations
|
||||
|
||||
import datetime as dt
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING, Any, Final
|
||||
from typing import Any, Final
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
@@ -20,9 +20,6 @@ from homeassistant.util import dt as dt_util
|
||||
|
||||
from .const import DOMAIN
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .const import TibberConfigEntry
|
||||
|
||||
PRICE_SERVICE_NAME = "get_prices"
|
||||
ATTR_START: Final = "start"
|
||||
ATTR_END: Final = "end"
|
||||
@@ -36,13 +33,7 @@ SERVICE_SCHEMA: Final = vol.Schema(
|
||||
|
||||
|
||||
async def __get_prices(call: ServiceCall) -> ServiceResponse:
|
||||
entries: list[TibberConfigEntry] = call.hass.config_entries.async_entries(DOMAIN)
|
||||
if not entries:
|
||||
raise ServiceValidationError(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="no_config_entry",
|
||||
)
|
||||
tibber_connection = entries[0].runtime_data.tibber_connection
|
||||
tibber_connection = call.hass.data[DOMAIN]
|
||||
|
||||
start = __get_date(call.data.get(ATTR_START), "start")
|
||||
end = __get_date(call.data.get(ATTR_END), "end")
|
||||
@@ -66,7 +57,7 @@ async def __get_prices(call: ServiceCall) -> ServiceResponse:
|
||||
selected_data = [
|
||||
price
|
||||
for price in price_data
|
||||
if start <= dt.datetime.fromisoformat(str(price["start_time"])) < end
|
||||
if start <= dt.datetime.fromisoformat(price["start_time"]) < end
|
||||
]
|
||||
tibber_prices[home_nickname] = selected_data
|
||||
|
||||
|
||||
@@ -1,11 +1,7 @@
|
||||
{
|
||||
"config": {
|
||||
"abort": {
|
||||
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]",
|
||||
"missing_configuration": "[%key:common::config_flow::abort::oauth2_missing_configuration%]",
|
||||
"missing_credentials": "[%key:common::config_flow::abort::oauth2_missing_credentials%]",
|
||||
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]",
|
||||
"wrong_account": "The connected account does not match {title}. Sign in with the same Tibber account and try again."
|
||||
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]"
|
||||
},
|
||||
"error": {
|
||||
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
|
||||
@@ -13,10 +9,6 @@
|
||||
"timeout": "[%key:common::config_flow::error::timeout_connect%]"
|
||||
},
|
||||
"step": {
|
||||
"reauth_confirm": {
|
||||
"description": "Reconnect your Tibber account to refresh access.",
|
||||
"title": "[%key:common::config_flow::title::reauth%]"
|
||||
},
|
||||
"user": {
|
||||
"data": {
|
||||
"access_token": "[%key:common::config_flow::data::access_token%]"
|
||||
@@ -48,12 +40,6 @@
|
||||
"average_power": {
|
||||
"name": "Average power"
|
||||
},
|
||||
"charging_current_max": {
|
||||
"name": "Maximum charging current"
|
||||
},
|
||||
"charging_current_offline_fallback": {
|
||||
"name": "Offline fallback charging current"
|
||||
},
|
||||
"current_l1": {
|
||||
"name": "Current L1"
|
||||
},
|
||||
@@ -102,18 +88,9 @@
|
||||
"power_production": {
|
||||
"name": "Power production"
|
||||
},
|
||||
"range_remaining": {
|
||||
"name": "Remaining range"
|
||||
},
|
||||
"signal_strength": {
|
||||
"name": "Signal strength"
|
||||
},
|
||||
"storage_state_of_charge": {
|
||||
"name": "Storage state of charge"
|
||||
},
|
||||
"storage_target_state_of_charge": {
|
||||
"name": "Storage target state of charge"
|
||||
},
|
||||
"voltage_phase1": {
|
||||
"name": "Voltage phase1"
|
||||
},
|
||||
@@ -126,18 +103,9 @@
|
||||
}
|
||||
},
|
||||
"exceptions": {
|
||||
"data_api_reauth_required": {
|
||||
"message": "Reconnect Tibber so Home Assistant can enable the new Tibber Data API features."
|
||||
},
|
||||
"invalid_date": {
|
||||
"message": "Invalid datetime provided {date}"
|
||||
},
|
||||
"no_config_entry": {
|
||||
"message": "No Tibber integration configured"
|
||||
},
|
||||
"oauth2_implementation_unavailable": {
|
||||
"message": "[%key:common::exceptions::oauth2_implementation_unavailable::message%]"
|
||||
},
|
||||
"send_message_timeout": {
|
||||
"message": "Timeout sending message with Tibber"
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
"zha",
|
||||
"universal_silabs_flasher"
|
||||
],
|
||||
"requirements": ["zha==0.0.80"],
|
||||
"requirements": ["zha==0.0.81"],
|
||||
"usb": [
|
||||
{
|
||||
"description": "*2652*",
|
||||
|
||||
@@ -38,7 +38,6 @@ APPLICATION_CREDENTIALS = [
|
||||
"smartthings",
|
||||
"spotify",
|
||||
"tesla_fleet",
|
||||
"tibber",
|
||||
"twitch",
|
||||
"volvo",
|
||||
"watts",
|
||||
|
||||
@@ -433,11 +433,21 @@ class EntityTriggerBase(Trigger):
|
||||
class EntityTargetStateTriggerBase(EntityTriggerBase):
|
||||
"""Trigger for entity state changes to a specific state."""
|
||||
|
||||
_to_state: str
|
||||
_to_states: set[str]
|
||||
|
||||
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
|
||||
"""Check if the origin state is valid and the state has changed."""
|
||||
if from_state.state in (STATE_UNAVAILABLE, STATE_UNKNOWN):
|
||||
return False
|
||||
|
||||
return (
|
||||
from_state.state != to_state.state
|
||||
and from_state.state not in self._to_states
|
||||
)
|
||||
|
||||
def is_valid_state(self, state: State) -> bool:
|
||||
"""Check if the new state matches the expected state."""
|
||||
return state.state == self._to_state
|
||||
return state.state in self._to_states
|
||||
|
||||
|
||||
class EntityTransitionTriggerBase(EntityTriggerBase):
|
||||
@@ -495,15 +505,20 @@ class EntityTargetStateAttributeTriggerBase(EntityTriggerBase):
|
||||
|
||||
|
||||
def make_entity_target_state_trigger(
|
||||
domain: str, to_state: str
|
||||
domain: str, to_states: str | set[str]
|
||||
) -> type[EntityTargetStateTriggerBase]:
|
||||
"""Create a trigger for entity state changes to a specific state."""
|
||||
"""Create a trigger for entity state changes to specific state(s)."""
|
||||
|
||||
if isinstance(to_states, str):
|
||||
to_states_set = {to_states}
|
||||
else:
|
||||
to_states_set = to_states
|
||||
|
||||
class CustomTrigger(EntityTargetStateTriggerBase):
|
||||
"""Trigger for entity state changes."""
|
||||
|
||||
_domain = domain
|
||||
_to_state = to_state
|
||||
_to_states = to_states_set
|
||||
|
||||
return CustomTrigger
|
||||
|
||||
|
||||
4
requirements_all.txt
generated
4
requirements_all.txt
generated
@@ -1864,7 +1864,7 @@ pyRFXtrx==0.31.1
|
||||
pySDCP==1
|
||||
|
||||
# homeassistant.components.tibber
|
||||
pyTibber==0.33.1
|
||||
pyTibber==0.32.2
|
||||
|
||||
# homeassistant.components.dlink
|
||||
pyW215==0.8.0
|
||||
@@ -3264,7 +3264,7 @@ zeroconf==0.148.0
|
||||
zeversolar==0.3.2
|
||||
|
||||
# homeassistant.components.zha
|
||||
zha==0.0.80
|
||||
zha==0.0.81
|
||||
|
||||
# homeassistant.components.zhong_hong
|
||||
zhong-hong-hvac==1.0.13
|
||||
|
||||
4
requirements_test_all.txt
generated
4
requirements_test_all.txt
generated
@@ -1592,7 +1592,7 @@ pyHomee==1.3.8
|
||||
pyRFXtrx==0.31.1
|
||||
|
||||
# homeassistant.components.tibber
|
||||
pyTibber==0.33.1
|
||||
pyTibber==0.32.2
|
||||
|
||||
# homeassistant.components.dlink
|
||||
pyW215==0.8.0
|
||||
@@ -2725,7 +2725,7 @@ zeroconf==0.148.0
|
||||
zeversolar==0.3.2
|
||||
|
||||
# homeassistant.components.zha
|
||||
zha==0.0.80
|
||||
zha==0.0.81
|
||||
|
||||
# homeassistant.components.zwave_js
|
||||
zwave-js-server-python==0.67.1
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""The tests for components."""
|
||||
|
||||
from collections.abc import Iterable
|
||||
from enum import StrEnum
|
||||
import itertools
|
||||
from typing import TypedDict
|
||||
@@ -345,6 +346,15 @@ def set_or_remove_state(
|
||||
)
|
||||
|
||||
|
||||
def other_states(state: StrEnum) -> list[str]:
|
||||
def other_states(state: StrEnum | Iterable[StrEnum]) -> list[str]:
|
||||
"""Return a sorted list with all states except the specified one."""
|
||||
return sorted({s.value for s in state.__class__} - {state.value})
|
||||
if isinstance(state, StrEnum):
|
||||
excluded_values = {state.value}
|
||||
enum_class = state.__class__
|
||||
else:
|
||||
if len(state) == 0:
|
||||
raise ValueError("state iterable must not be empty")
|
||||
excluded_values = {s.value for s in state}
|
||||
enum_class = list(state)[0].__class__
|
||||
|
||||
return sorted({s.value for s in enum_class} - excluded_values)
|
||||
|
||||
@@ -1,17 +1,22 @@
|
||||
"""Test climate trigger."""
|
||||
|
||||
from collections.abc import Generator
|
||||
from contextlib import AbstractContextManager, nullcontext as does_not_raise
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.climate.const import (
|
||||
ATTR_HVAC_ACTION,
|
||||
HVACAction,
|
||||
HVACMode,
|
||||
)
|
||||
from homeassistant.const import ATTR_LABEL_ID, CONF_ENTITY_ID
|
||||
from homeassistant.components.climate.trigger import CONF_HVAC_MODE
|
||||
from homeassistant.const import ATTR_LABEL_ID, CONF_ENTITY_ID, CONF_OPTIONS, CONF_TARGET
|
||||
from homeassistant.core import HomeAssistant, ServiceCall
|
||||
from homeassistant.helpers.trigger import async_validate_trigger_config
|
||||
|
||||
from tests.components import (
|
||||
StateDescription,
|
||||
@@ -48,6 +53,7 @@ async def target_climates(hass: HomeAssistant) -> list[str]:
|
||||
@pytest.mark.parametrize(
|
||||
"trigger_key",
|
||||
[
|
||||
"climate.hvac_mode_changed",
|
||||
"climate.turned_off",
|
||||
"climate.turned_on",
|
||||
"climate.started_heating",
|
||||
@@ -66,20 +72,105 @@ async def test_climate_triggers_gated_by_labs_flag(
|
||||
) in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("enable_experimental_triggers_conditions")
|
||||
@pytest.mark.parametrize(
|
||||
("trigger", "trigger_options", "expected_result"),
|
||||
[
|
||||
# Test validating climate.hvac_mode_changed
|
||||
# Valid configurations
|
||||
(
|
||||
"climate.hvac_mode_changed",
|
||||
{CONF_HVAC_MODE: [HVACMode.HEAT, HVACMode.COOL]},
|
||||
does_not_raise(),
|
||||
),
|
||||
(
|
||||
"climate.hvac_mode_changed",
|
||||
{CONF_HVAC_MODE: HVACMode.HEAT},
|
||||
does_not_raise(),
|
||||
),
|
||||
# Invalid configurations
|
||||
(
|
||||
"climate.hvac_mode_changed",
|
||||
# Empty hvac_mode list
|
||||
{CONF_HVAC_MODE: []},
|
||||
pytest.raises(vol.Invalid),
|
||||
),
|
||||
(
|
||||
"climate.hvac_mode_changed",
|
||||
# Missing CONF_HVAC_MODE
|
||||
{},
|
||||
pytest.raises(vol.Invalid),
|
||||
),
|
||||
(
|
||||
"climate.hvac_mode_changed",
|
||||
{CONF_HVAC_MODE: ["invalid_mode"]},
|
||||
pytest.raises(vol.Invalid),
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_climate_trigger_validation(
|
||||
hass: HomeAssistant,
|
||||
trigger: str,
|
||||
trigger_options: dict[str, Any],
|
||||
expected_result: AbstractContextManager,
|
||||
) -> None:
|
||||
"""Test climate trigger config validation."""
|
||||
with expected_result:
|
||||
await async_validate_trigger_config(
|
||||
hass,
|
||||
[
|
||||
{
|
||||
"platform": trigger,
|
||||
CONF_TARGET: {CONF_ENTITY_ID: "climate.test_climate"},
|
||||
CONF_OPTIONS: trigger_options,
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
def parametrize_climate_trigger_states(
|
||||
*,
|
||||
trigger: str,
|
||||
trigger_options: dict | None = None,
|
||||
target_states: list[str | None | tuple[str | None, dict]],
|
||||
other_states: list[str | None | tuple[str | None, dict]],
|
||||
additional_attributes: dict | None = None,
|
||||
trigger_from_none: bool = True,
|
||||
) -> list[tuple[str, dict[str, Any], list[StateDescription]]]:
|
||||
"""Parametrize states and expected service call counts."""
|
||||
trigger_options = trigger_options or {}
|
||||
return [
|
||||
(s[0], trigger_options, *s[1:])
|
||||
for s in parametrize_trigger_states(
|
||||
trigger=trigger,
|
||||
target_states=target_states,
|
||||
other_states=other_states,
|
||||
additional_attributes=additional_attributes,
|
||||
trigger_from_none=trigger_from_none,
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("enable_experimental_triggers_conditions")
|
||||
@pytest.mark.parametrize(
|
||||
("trigger_target_config", "entity_id", "entities_in_target"),
|
||||
parametrize_target_entities("climate"),
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
("trigger", "states"),
|
||||
("trigger", "trigger_options", "states"),
|
||||
[
|
||||
*parametrize_trigger_states(
|
||||
*parametrize_climate_trigger_states(
|
||||
trigger="climate.hvac_mode_changed",
|
||||
trigger_options={CONF_HVAC_MODE: [HVACMode.HEAT, HVACMode.COOL]},
|
||||
target_states=[HVACMode.HEAT, HVACMode.COOL],
|
||||
other_states=other_states([HVACMode.HEAT, HVACMode.COOL]),
|
||||
),
|
||||
*parametrize_climate_trigger_states(
|
||||
trigger="climate.turned_off",
|
||||
target_states=[HVACMode.OFF],
|
||||
other_states=other_states(HVACMode.OFF),
|
||||
),
|
||||
*parametrize_trigger_states(
|
||||
*parametrize_climate_trigger_states(
|
||||
trigger="climate.turned_on",
|
||||
target_states=[
|
||||
HVACMode.AUTO,
|
||||
@@ -103,6 +194,7 @@ async def test_climate_state_trigger_behavior_any(
|
||||
entity_id: str,
|
||||
entities_in_target: int,
|
||||
trigger: str,
|
||||
trigger_options: dict[str, Any],
|
||||
states: list[StateDescription],
|
||||
) -> None:
|
||||
"""Test that the climate state trigger fires when any climate state changes to a specific state."""
|
||||
@@ -113,7 +205,7 @@ async def test_climate_state_trigger_behavior_any(
|
||||
set_or_remove_state(hass, eid, states[0]["included"])
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await arm_trigger(hass, trigger, {}, trigger_target_config)
|
||||
await arm_trigger(hass, trigger, trigger_options, trigger_target_config)
|
||||
|
||||
for state in states[1:]:
|
||||
included_state = state["included"]
|
||||
@@ -200,14 +292,20 @@ async def test_climate_state_attribute_trigger_behavior_any(
|
||||
parametrize_target_entities("climate"),
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
("trigger", "states"),
|
||||
("trigger", "trigger_options", "states"),
|
||||
[
|
||||
*parametrize_trigger_states(
|
||||
*parametrize_climate_trigger_states(
|
||||
trigger="climate.hvac_mode_changed",
|
||||
trigger_options={CONF_HVAC_MODE: [HVACMode.HEAT, HVACMode.COOL]},
|
||||
target_states=[HVACMode.HEAT, HVACMode.COOL],
|
||||
other_states=other_states([HVACMode.HEAT, HVACMode.COOL]),
|
||||
),
|
||||
*parametrize_climate_trigger_states(
|
||||
trigger="climate.turned_off",
|
||||
target_states=[HVACMode.OFF],
|
||||
other_states=other_states(HVACMode.OFF),
|
||||
),
|
||||
*parametrize_trigger_states(
|
||||
*parametrize_climate_trigger_states(
|
||||
trigger="climate.turned_on",
|
||||
target_states=[
|
||||
HVACMode.AUTO,
|
||||
@@ -231,6 +329,7 @@ async def test_climate_state_trigger_behavior_first(
|
||||
entities_in_target: int,
|
||||
entity_id: str,
|
||||
trigger: str,
|
||||
trigger_options: dict[str, Any],
|
||||
states: list[StateDescription],
|
||||
) -> None:
|
||||
"""Test that the climate state trigger fires when the first climate changes to a specific state."""
|
||||
@@ -241,7 +340,9 @@ async def test_climate_state_trigger_behavior_first(
|
||||
set_or_remove_state(hass, eid, states[0]["included"])
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await arm_trigger(hass, trigger, {"behavior": "first"}, trigger_target_config)
|
||||
await arm_trigger(
|
||||
hass, trigger, {"behavior": "first"} | trigger_options, trigger_target_config
|
||||
)
|
||||
|
||||
for state in states[1:]:
|
||||
included_state = state["included"]
|
||||
@@ -294,7 +395,7 @@ async def test_climate_state_attribute_trigger_behavior_first(
|
||||
trigger: str,
|
||||
states: list[tuple[tuple[str, dict], int]],
|
||||
) -> None:
|
||||
"""Test that the climate state trigger fires when any climate state changes to a specific state."""
|
||||
"""Test that the climate state trigger fires when the first climate state changes to a specific state."""
|
||||
other_entity_ids = set(target_climates) - {entity_id}
|
||||
|
||||
# Set all climates, including the tested climate, to the initial state
|
||||
@@ -326,14 +427,20 @@ async def test_climate_state_attribute_trigger_behavior_first(
|
||||
parametrize_target_entities("climate"),
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
("trigger", "states"),
|
||||
("trigger", "trigger_options", "states"),
|
||||
[
|
||||
*parametrize_trigger_states(
|
||||
*parametrize_climate_trigger_states(
|
||||
trigger="climate.hvac_mode_changed",
|
||||
trigger_options={CONF_HVAC_MODE: [HVACMode.HEAT, HVACMode.COOL]},
|
||||
target_states=[HVACMode.HEAT, HVACMode.COOL],
|
||||
other_states=other_states([HVACMode.HEAT, HVACMode.COOL]),
|
||||
),
|
||||
*parametrize_climate_trigger_states(
|
||||
trigger="climate.turned_off",
|
||||
target_states=[HVACMode.OFF],
|
||||
other_states=other_states(HVACMode.OFF),
|
||||
),
|
||||
*parametrize_trigger_states(
|
||||
*parametrize_climate_trigger_states(
|
||||
trigger="climate.turned_on",
|
||||
target_states=[
|
||||
HVACMode.AUTO,
|
||||
@@ -357,6 +464,7 @@ async def test_climate_state_trigger_behavior_last(
|
||||
entities_in_target: int,
|
||||
entity_id: str,
|
||||
trigger: str,
|
||||
trigger_options: dict[str, Any],
|
||||
states: list[StateDescription],
|
||||
) -> None:
|
||||
"""Test that the climate state trigger fires when the last climate changes to a specific state."""
|
||||
@@ -367,7 +475,9 @@ async def test_climate_state_trigger_behavior_last(
|
||||
set_or_remove_state(hass, eid, states[0]["included"])
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await arm_trigger(hass, trigger, {"behavior": "last"}, trigger_target_config)
|
||||
await arm_trigger(
|
||||
hass, trigger, {"behavior": "last"} | trigger_options, trigger_target_config
|
||||
)
|
||||
|
||||
for state in states[1:]:
|
||||
included_state = state["included"]
|
||||
@@ -419,7 +529,7 @@ async def test_climate_state_attribute_trigger_behavior_last(
|
||||
trigger: str,
|
||||
states: list[tuple[tuple[str, dict], int]],
|
||||
) -> None:
|
||||
"""Test that the climate state trigger fires when any climate state changes to a specific state."""
|
||||
"""Test that the climate state trigger fires when the last climate state changes to a specific state."""
|
||||
other_entity_ids = set(target_climates) - {entity_id}
|
||||
|
||||
# Set all climates, including the tested climate, to the initial state
|
||||
|
||||
@@ -133,6 +133,7 @@
|
||||
# name: test_switches[switch.gs012345-state]
|
||||
StateSnapshot({
|
||||
'attributes': ReadOnlyDict({
|
||||
'entity_picture': 'https://lion.lamarzocco.io/img/thing-model/detail/gs3av/gs3av-1.png',
|
||||
'friendly_name': 'GS012345',
|
||||
}),
|
||||
'context': <ANY>,
|
||||
|
||||
@@ -46,7 +46,7 @@
|
||||
"0/40/65532": 0,
|
||||
"0/40/0": 19,
|
||||
"0/40/6": "**REDACTED**",
|
||||
"0/40/1": "Beep Home",
|
||||
"0/40/1": "TEST_VENDOR",
|
||||
"0/40/2": 65521,
|
||||
"0/40/3": "Mock speaker",
|
||||
"0/40/4": 32768,
|
||||
@@ -54,7 +54,7 @@
|
||||
"0/40/8": "1.0",
|
||||
"0/40/9": 1,
|
||||
"0/40/10": "1.0",
|
||||
"0/40/18": "A576929DE6D138DC",
|
||||
"0/40/18": "123456789",
|
||||
"0/40/19": {
|
||||
"0": 3,
|
||||
"1": 3
|
||||
@@ -221,6 +221,8 @@
|
||||
"1/8/65532": 0,
|
||||
"1/8/65533": 6,
|
||||
"1/8/0": 47,
|
||||
"1/8/2": 0,
|
||||
"1/8/3": 100,
|
||||
"1/8/17": null,
|
||||
"1/8/15": 0,
|
||||
"1/8/65528": [],
|
||||
|
||||
@@ -3691,6 +3691,64 @@
|
||||
'state': '4.0',
|
||||
})
|
||||
# ---
|
||||
# name: test_numbers[speaker][number.mock_speaker_volume-entry]
|
||||
EntityRegistryEntrySnapshot({
|
||||
'aliases': set({
|
||||
}),
|
||||
'area_id': None,
|
||||
'capabilities': dict({
|
||||
'max': 100,
|
||||
'min': 0,
|
||||
'mode': <NumberMode.SLIDER: 'slider'>,
|
||||
'step': 1,
|
||||
}),
|
||||
'config_entry_id': <ANY>,
|
||||
'config_subentry_id': <ANY>,
|
||||
'device_class': None,
|
||||
'device_id': <ANY>,
|
||||
'disabled_by': None,
|
||||
'domain': 'number',
|
||||
'entity_category': None,
|
||||
'entity_id': 'number.mock_speaker_volume',
|
||||
'has_entity_name': True,
|
||||
'hidden_by': None,
|
||||
'icon': None,
|
||||
'id': <ANY>,
|
||||
'labels': set({
|
||||
}),
|
||||
'name': None,
|
||||
'options': dict({
|
||||
}),
|
||||
'original_device_class': None,
|
||||
'original_icon': None,
|
||||
'original_name': 'Volume',
|
||||
'platform': 'matter',
|
||||
'previous_unique_id': None,
|
||||
'suggested_object_id': None,
|
||||
'supported_features': 0,
|
||||
'translation_key': 'speaker_setpoint',
|
||||
'unique_id': '00000000000004D2-000000000000006B-MatterNodeDevice-1-speaker_setpoint-8-0',
|
||||
'unit_of_measurement': '%',
|
||||
})
|
||||
# ---
|
||||
# name: test_numbers[speaker][number.mock_speaker_volume-state]
|
||||
StateSnapshot({
|
||||
'attributes': ReadOnlyDict({
|
||||
'friendly_name': 'Mock speaker Volume',
|
||||
'max': 100,
|
||||
'min': 0,
|
||||
'mode': <NumberMode.SLIDER: 'slider'>,
|
||||
'step': 1,
|
||||
'unit_of_measurement': '%',
|
||||
}),
|
||||
'context': <ANY>,
|
||||
'entity_id': 'number.mock_speaker_volume',
|
||||
'last_changed': <ANY>,
|
||||
'last_reported': <ANY>,
|
||||
'last_updated': <ANY>,
|
||||
'state': '47',
|
||||
})
|
||||
# ---
|
||||
# name: test_numbers[valve][number.valve_default_open_duration-entry]
|
||||
EntityRegistryEntrySnapshot({
|
||||
'aliases': set({
|
||||
|
||||
@@ -4,68 +4,21 @@ from collections.abc import AsyncGenerator
|
||||
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
|
||||
|
||||
import pytest
|
||||
import tibber
|
||||
|
||||
from homeassistant.components.application_credentials import (
|
||||
ClientCredential,
|
||||
async_import_client_credential,
|
||||
)
|
||||
from homeassistant.components.recorder import Recorder
|
||||
from homeassistant.components.tibber import TibberRuntimeData
|
||||
from homeassistant.components.tibber.const import AUTH_IMPLEMENTATION, DOMAIN
|
||||
from homeassistant.components.tibber.coordinator import TibberDataAPICoordinator
|
||||
from homeassistant.components.tibber.const import DOMAIN
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
|
||||
def create_tibber_device(
|
||||
device_id: str = "device-id",
|
||||
external_id: str = "external-id",
|
||||
name: str = "Test Device",
|
||||
brand: str = "Tibber",
|
||||
model: str = "Gen1",
|
||||
value: float | None = 72.0,
|
||||
home_id: str = "home-id",
|
||||
) -> tibber.data_api.TibberDevice:
|
||||
"""Create a fake Tibber Data API device."""
|
||||
device_data = {
|
||||
"id": device_id,
|
||||
"externalId": external_id,
|
||||
"info": {
|
||||
"name": name,
|
||||
"brand": brand,
|
||||
"model": model,
|
||||
},
|
||||
"capabilities": [
|
||||
{
|
||||
"id": "storage.stateOfCharge",
|
||||
"value": value,
|
||||
"description": "State of charge",
|
||||
"unit": "%",
|
||||
},
|
||||
{
|
||||
"id": "unknown.sensor.id",
|
||||
"value": None,
|
||||
"description": "Unknown",
|
||||
"unit": "",
|
||||
},
|
||||
],
|
||||
}
|
||||
return tibber.data_api.TibberDevice(device_data, home_id=home_id)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def config_entry(hass: HomeAssistant) -> MockConfigEntry:
|
||||
"""Tibber config entry."""
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={
|
||||
CONF_ACCESS_TOKEN: "token",
|
||||
AUTH_IMPLEMENTATION: DOMAIN,
|
||||
},
|
||||
data={CONF_ACCESS_TOKEN: "token"},
|
||||
unique_id="tibber",
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
@@ -86,88 +39,8 @@ async def mock_tibber_setup(
|
||||
tibber_mock.name = PropertyMock(return_value=title)
|
||||
tibber_mock.send_notification = AsyncMock()
|
||||
tibber_mock.rt_disconnect = AsyncMock()
|
||||
tibber_mock.get_homes = MagicMock(return_value=[])
|
||||
|
||||
session_mock = MagicMock()
|
||||
session_mock.async_ensure_token_valid = AsyncMock()
|
||||
session_mock.token = {CONF_ACCESS_TOKEN: "test-token"}
|
||||
|
||||
implementation_mock = MagicMock()
|
||||
|
||||
data_api_client_mock = MagicMock()
|
||||
data_api_client_mock.get_all_devices = AsyncMock(return_value={})
|
||||
data_api_client_mock.update_devices = AsyncMock(return_value={})
|
||||
|
||||
with (
|
||||
patch("tibber.Tibber", return_value=tibber_mock),
|
||||
patch(
|
||||
"homeassistant.components.tibber.async_get_config_entry_implementation",
|
||||
return_value=implementation_mock,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.tibber.OAuth2Session",
|
||||
return_value=session_mock,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.tibber.coordinator.TibberDataAPICoordinator._async_get_client",
|
||||
return_value=data_api_client_mock,
|
||||
),
|
||||
):
|
||||
with patch("tibber.Tibber", return_value=tibber_mock):
|
||||
await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
yield tibber_mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def setup_credentials(recorder_mock: Recorder, hass: HomeAssistant) -> None:
|
||||
"""Set up application credentials for the OAuth flow."""
|
||||
assert await async_setup_component(hass, "application_credentials", {})
|
||||
await async_import_client_credential(
|
||||
hass,
|
||||
DOMAIN,
|
||||
ClientCredential("test-client-id", "test-client-secret"),
|
||||
DOMAIN,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def data_api_entry(hass: HomeAssistant) -> MockConfigEntry:
|
||||
"""Create a Data API Tibber config entry."""
|
||||
entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_ACCESS_TOKEN: "token"},
|
||||
unique_id="data-api",
|
||||
)
|
||||
entry.add_to_hass(hass)
|
||||
return entry
|
||||
|
||||
|
||||
def create_mock_runtime(
|
||||
async_get_client: AsyncMock | None = None,
|
||||
tibber_connection: MagicMock | None = None,
|
||||
coordinator_data: dict | None = None,
|
||||
) -> TibberRuntimeData:
|
||||
"""Create a mock TibberRuntimeData.
|
||||
|
||||
Args:
|
||||
async_get_client: Optional async mock for getting the Data API client.
|
||||
tibber_connection: Optional mock for the GraphQL connection.
|
||||
coordinator_data: Optional data dict for the coordinator.
|
||||
|
||||
"""
|
||||
session = MagicMock()
|
||||
session.async_ensure_token_valid = AsyncMock()
|
||||
session.token = {CONF_ACCESS_TOKEN: "test-token"}
|
||||
|
||||
coordinator = MagicMock(spec=TibberDataAPICoordinator)
|
||||
coordinator.data = coordinator_data if coordinator_data is not None else {}
|
||||
coordinator.sensors_by_device = {}
|
||||
|
||||
runtime = MagicMock(spec=TibberRuntimeData)
|
||||
runtime.session = session
|
||||
runtime.tibber_connection = tibber_connection or MagicMock()
|
||||
runtime.tibber_connection.get_homes = MagicMock(return_value=[])
|
||||
runtime.data_api_coordinator = coordinator
|
||||
runtime.async_get_client = async_get_client or AsyncMock()
|
||||
|
||||
return runtime
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
"""Tests for Tibber config flow."""
|
||||
|
||||
import builtins
|
||||
from http import HTTPStatus
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
from asyncio import TimeoutError
|
||||
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
|
||||
|
||||
from aiohttp import ClientError
|
||||
import pytest
|
||||
@@ -15,22 +13,16 @@ from tibber import (
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components.recorder import Recorder
|
||||
from homeassistant.components.tibber.application_credentials import TOKEN_URL
|
||||
from homeassistant.components.tibber.config_flow import (
|
||||
DATA_API_DEFAULT_SCOPES,
|
||||
ERR_CLIENT,
|
||||
ERR_TIMEOUT,
|
||||
ERR_TOKEN,
|
||||
)
|
||||
from homeassistant.components.tibber.const import AUTH_IMPLEMENTATION, DOMAIN
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_TOKEN
|
||||
from homeassistant.components.tibber.const import DOMAIN
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.data_entry_flow import FlowResultType
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
from tests.test_util.aiohttp import AiohttpClientMocker
|
||||
from tests.typing import ClientSessionGenerator
|
||||
|
||||
|
||||
@pytest.fixture(name="tibber_setup", autouse=True)
|
||||
def tibber_setup_fixture():
|
||||
@@ -39,22 +31,6 @@ def tibber_setup_fixture():
|
||||
yield
|
||||
|
||||
|
||||
def _mock_tibber(
|
||||
*,
|
||||
user_id: str = "unique_user_id",
|
||||
title: str = "Mock Name",
|
||||
update_side_effect: Exception | None = None,
|
||||
) -> MagicMock:
|
||||
"""Return a mocked Tibber GraphQL client."""
|
||||
tibber_mock = MagicMock()
|
||||
tibber_mock.user_id = user_id
|
||||
tibber_mock.name = title
|
||||
tibber_mock.update_info = AsyncMock()
|
||||
if update_side_effect is not None:
|
||||
tibber_mock.update_info.side_effect = update_side_effect
|
||||
return tibber_mock
|
||||
|
||||
|
||||
async def test_show_config_form(recorder_mock: Recorder, hass: HomeAssistant) -> None:
|
||||
"""Test show configuration form."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
@@ -65,35 +41,62 @@ async def test_show_config_form(recorder_mock: Recorder, hass: HomeAssistant) ->
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
|
||||
async def test_create_entry(recorder_mock: Recorder, hass: HomeAssistant) -> None:
|
||||
"""Test create entry from user input."""
|
||||
test_data = {
|
||||
CONF_ACCESS_TOKEN: "valid",
|
||||
}
|
||||
|
||||
unique_user_id = "unique_user_id"
|
||||
title = "title"
|
||||
|
||||
tibber_mock = MagicMock()
|
||||
type(tibber_mock).update_info = AsyncMock(return_value=True)
|
||||
type(tibber_mock).user_id = PropertyMock(return_value=unique_user_id)
|
||||
type(tibber_mock).name = PropertyMock(return_value=title)
|
||||
|
||||
with patch("tibber.Tibber", return_value=tibber_mock):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=test_data
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.CREATE_ENTRY
|
||||
assert result["title"] == title
|
||||
assert result["data"] == test_data
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("exception", "expected_error"),
|
||||
[
|
||||
(builtins.TimeoutError(), ERR_TIMEOUT),
|
||||
(ClientError(), ERR_CLIENT),
|
||||
(TimeoutError, ERR_TIMEOUT),
|
||||
(ClientError, ERR_CLIENT),
|
||||
(InvalidLoginError(401), ERR_TOKEN),
|
||||
(RetryableHttpExceptionError(503), ERR_CLIENT),
|
||||
(FatalHttpExceptionError(404), ERR_CLIENT),
|
||||
],
|
||||
)
|
||||
async def test_graphql_step_exceptions(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
exception: Exception,
|
||||
expected_error: str,
|
||||
async def test_create_entry_exceptions(
|
||||
recorder_mock: Recorder, hass: HomeAssistant, exception, expected_error
|
||||
) -> None:
|
||||
"""Validate GraphQL errors are surfaced."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
"""Test create entry from user input."""
|
||||
test_data = {
|
||||
CONF_ACCESS_TOKEN: "valid",
|
||||
}
|
||||
|
||||
unique_user_id = "unique_user_id"
|
||||
title = "title"
|
||||
|
||||
tibber_mock = MagicMock()
|
||||
type(tibber_mock).update_info = AsyncMock(side_effect=exception)
|
||||
type(tibber_mock).user_id = PropertyMock(return_value=unique_user_id)
|
||||
type(tibber_mock).name = PropertyMock(return_value=title)
|
||||
|
||||
tibber_mock = _mock_tibber(update_side_effect=exception)
|
||||
with patch("tibber.Tibber", return_value=tibber_mock):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_ACCESS_TOKEN: "invalid"}
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=test_data
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "user"
|
||||
assert result["errors"][CONF_ACCESS_TOKEN] == expected_error
|
||||
|
||||
|
||||
@@ -101,203 +104,13 @@ async def test_flow_entry_already_exists(
|
||||
recorder_mock: Recorder, hass: HomeAssistant, config_entry
|
||||
) -> None:
|
||||
"""Test user input for config_entry that already exists."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
test_data = {
|
||||
CONF_ACCESS_TOKEN: "valid",
|
||||
}
|
||||
|
||||
tibber_mock = _mock_tibber(user_id="tibber")
|
||||
with patch("tibber.Tibber", return_value=tibber_mock):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_ACCESS_TOKEN: "valid"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "already_configured"
|
||||
|
||||
|
||||
async def test_reauth_flow_steps(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
config_entry: MockConfigEntry,
|
||||
) -> None:
|
||||
"""Test the reauth flow goes through reauth_confirm to user step."""
|
||||
reauth_flow = await config_entry.start_reauth_flow(hass)
|
||||
|
||||
assert reauth_flow["type"] is FlowResultType.FORM
|
||||
assert reauth_flow["step_id"] == "reauth_confirm"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(reauth_flow["flow_id"])
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "reauth_confirm"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
reauth_flow["flow_id"],
|
||||
user_input={},
|
||||
)
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
|
||||
async def test_oauth_create_entry_missing_configuration(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
"""Abort OAuth finalize if GraphQL step did not run."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": config_entries.SOURCE_USER},
|
||||
)
|
||||
handler = hass.config_entries.flow._progress[result["flow_id"]]
|
||||
|
||||
flow_result = await handler.async_oauth_create_entry(
|
||||
{CONF_TOKEN: {CONF_ACCESS_TOKEN: "rest-token"}}
|
||||
)
|
||||
|
||||
assert flow_result["type"] is FlowResultType.ABORT
|
||||
assert flow_result["reason"] == "missing_configuration"
|
||||
|
||||
|
||||
async def test_oauth_create_entry_cannot_connect_userinfo(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
"""Abort OAuth finalize when Data API userinfo cannot be retrieved."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": config_entries.SOURCE_USER},
|
||||
)
|
||||
handler = hass.config_entries.flow._progress[result["flow_id"]]
|
||||
handler._access_token = "graphql-token"
|
||||
|
||||
data_api_client = MagicMock()
|
||||
data_api_client.get_userinfo = AsyncMock(side_effect=ClientError())
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.tibber.config_flow.tibber_data_api.TibberDataAPI",
|
||||
return_value=data_api_client,
|
||||
):
|
||||
flow_result = await handler.async_oauth_create_entry(
|
||||
{CONF_TOKEN: {CONF_ACCESS_TOKEN: "rest-token"}}
|
||||
)
|
||||
|
||||
assert flow_result["type"] is FlowResultType.ABORT
|
||||
assert flow_result["reason"] == "cannot_connect"
|
||||
|
||||
|
||||
async def test_data_api_requires_credentials(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Abort when OAuth credentials are missing."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
|
||||
tibber_mock = _mock_tibber()
|
||||
with patch("tibber.Tibber", return_value=tibber_mock):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_ACCESS_TOKEN: "valid"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "missing_credentials"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_credentials", "current_request_with_host")
|
||||
async def test_data_api_extra_authorize_scope(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Ensure the OAuth implementation requests Tibber scopes."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
|
||||
tibber_mock = _mock_tibber()
|
||||
with patch("tibber.Tibber", return_value=tibber_mock):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_ACCESS_TOKEN: "valid"}
|
||||
)
|
||||
|
||||
handler = hass.config_entries.flow._progress[result["flow_id"]]
|
||||
assert handler.extra_authorize_data["scope"] == " ".join(DATA_API_DEFAULT_SCOPES)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_credentials", "current_request_with_host")
|
||||
async def test_full_flow_success(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
hass_client_no_auth: ClientSessionGenerator,
|
||||
aioclient_mock: AiohttpClientMocker,
|
||||
) -> None:
|
||||
"""Test configuring Tibber via GraphQL + OAuth."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
|
||||
tibber_mock = _mock_tibber()
|
||||
with patch("tibber.Tibber", return_value=tibber_mock):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_ACCESS_TOKEN: "graphql-token"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.EXTERNAL_STEP
|
||||
authorize_url = result["url"]
|
||||
state = parse_qs(urlparse(authorize_url).query)["state"][0]
|
||||
|
||||
client = await hass_client_no_auth()
|
||||
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
|
||||
assert resp.status == HTTPStatus.OK
|
||||
|
||||
aioclient_mock.post(
|
||||
TOKEN_URL,
|
||||
json={
|
||||
"access_token": "mock-access-token",
|
||||
"refresh_token": "mock-refresh-token",
|
||||
"token_type": "bearer",
|
||||
"expires_in": 3600,
|
||||
},
|
||||
)
|
||||
|
||||
data_api_client = MagicMock()
|
||||
data_api_client.get_userinfo = AsyncMock(return_value={"name": "Mock Name"})
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.tibber.config_flow.tibber_data_api.TibberDataAPI",
|
||||
return_value=data_api_client,
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] is FlowResultType.CREATE_ENTRY
|
||||
data = result["data"]
|
||||
assert data[CONF_TOKEN]["access_token"] == "mock-access-token"
|
||||
assert data[CONF_ACCESS_TOKEN] == "graphql-token"
|
||||
assert data[AUTH_IMPLEMENTATION] == DOMAIN
|
||||
assert result["title"] == "Mock Name"
|
||||
|
||||
|
||||
async def test_data_api_abort_when_already_configured(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Ensure only a single Data API entry can be configured."""
|
||||
existing_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={
|
||||
AUTH_IMPLEMENTATION: DOMAIN,
|
||||
CONF_TOKEN: {"access_token": "existing"},
|
||||
CONF_ACCESS_TOKEN: "stored-graphql",
|
||||
},
|
||||
unique_id="unique_user_id",
|
||||
title="Existing",
|
||||
)
|
||||
existing_entry.add_to_hass(hass)
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
|
||||
tibber_mock = _mock_tibber()
|
||||
with patch("tibber.Tibber", return_value=tibber_mock):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_ACCESS_TOKEN: "new-token"}
|
||||
with patch("tibber.Tibber.update_info", return_value=None):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=test_data
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
|
||||
@@ -1,121 +0,0 @@
|
||||
"""Tests for the Tibber Data API coordinator and sensors."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import aiohttp
|
||||
import pytest
|
||||
import tibber
|
||||
|
||||
from homeassistant.components.tibber.coordinator import TibberDataAPICoordinator
|
||||
from homeassistant.components.tibber.sensor import (
|
||||
TibberDataAPISensor,
|
||||
_setup_data_api_sensors,
|
||||
)
|
||||
from homeassistant.config_entries import ConfigEntryState
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
|
||||
from homeassistant.helpers.update_coordinator import UpdateFailed
|
||||
|
||||
from .conftest import create_mock_runtime, create_tibber_device
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
|
||||
async def test_data_api_setup_adds_entities(
|
||||
hass: HomeAssistant,
|
||||
data_api_entry: MockConfigEntry,
|
||||
) -> None:
|
||||
"""Ensure Data API sensors are created and coordinator refreshes data."""
|
||||
client = MagicMock()
|
||||
client.get_all_devices = AsyncMock(
|
||||
return_value={"device-id": create_tibber_device(value=72.0)}
|
||||
)
|
||||
client.update_devices = AsyncMock(
|
||||
return_value={"device-id": create_tibber_device(value=83.0)}
|
||||
)
|
||||
async_get_client = AsyncMock(return_value=client)
|
||||
runtime = create_mock_runtime(async_get_client=async_get_client)
|
||||
|
||||
data_api_entry.runtime_data = runtime
|
||||
data_api_entry.mock_state(hass, ConfigEntryState.SETUP_IN_PROGRESS)
|
||||
|
||||
coordinator = TibberDataAPICoordinator(hass, data_api_entry)
|
||||
await coordinator.async_config_entry_first_refresh()
|
||||
runtime.data_api_coordinator = coordinator
|
||||
|
||||
added_entities: list[TibberDataAPISensor] = []
|
||||
|
||||
def async_add_entities(entities: list[TibberDataAPISensor]) -> None:
|
||||
added_entities.extend(entities)
|
||||
|
||||
_setup_data_api_sensors(data_api_entry, async_add_entities)
|
||||
|
||||
assert async_get_client.await_count == 2
|
||||
client.get_all_devices.assert_awaited_once()
|
||||
client.update_devices.assert_awaited_once()
|
||||
|
||||
assert len(added_entities) == 1
|
||||
sensor = added_entities[0]
|
||||
assert sensor.entity_description.key == "storage.stateOfCharge"
|
||||
assert sensor.native_value == 83.0
|
||||
assert sensor.available
|
||||
|
||||
sensor.coordinator.data = {}
|
||||
sensor.coordinator.sensors_by_device = {}
|
||||
assert sensor.native_value is None
|
||||
assert not sensor.available
|
||||
|
||||
|
||||
async def test_data_api_coordinator_first_refresh_failure(
|
||||
hass: HomeAssistant, data_api_entry: MockConfigEntry
|
||||
) -> None:
|
||||
"""Ensure network failures during setup raise ConfigEntryNotReady."""
|
||||
async_get_client = AsyncMock(side_effect=aiohttp.ClientError("boom"))
|
||||
runtime = create_mock_runtime(async_get_client=async_get_client)
|
||||
data_api_entry.runtime_data = runtime
|
||||
|
||||
coordinator = TibberDataAPICoordinator(hass, data_api_entry)
|
||||
data_api_entry.mock_state(hass, ConfigEntryState.SETUP_IN_PROGRESS)
|
||||
|
||||
with pytest.raises(ConfigEntryNotReady):
|
||||
await coordinator.async_config_entry_first_refresh()
|
||||
assert isinstance(coordinator.last_exception, UpdateFailed)
|
||||
|
||||
|
||||
async def test_data_api_coordinator_first_refresh_auth_failed(
|
||||
hass: HomeAssistant, data_api_entry: MockConfigEntry
|
||||
) -> None:
|
||||
"""Ensure auth failures during setup propagate."""
|
||||
async_get_client = AsyncMock(side_effect=ConfigEntryAuthFailed("invalid"))
|
||||
runtime = create_mock_runtime(async_get_client=async_get_client)
|
||||
data_api_entry.runtime_data = runtime
|
||||
|
||||
coordinator = TibberDataAPICoordinator(hass, data_api_entry)
|
||||
data_api_entry.mock_state(hass, ConfigEntryState.SETUP_IN_PROGRESS)
|
||||
|
||||
with pytest.raises(ConfigEntryAuthFailed):
|
||||
await coordinator.async_config_entry_first_refresh()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"exception",
|
||||
[
|
||||
aiohttp.ClientError("err"),
|
||||
TimeoutError(),
|
||||
tibber.UserAgentMissingError("err"),
|
||||
],
|
||||
)
|
||||
async def test_data_api_coordinator_update_failures(
|
||||
hass: HomeAssistant, data_api_entry: MockConfigEntry, exception: Exception
|
||||
) -> None:
|
||||
"""Ensure update failures are wrapped in UpdateFailed."""
|
||||
async_get_client = AsyncMock(side_effect=exception)
|
||||
runtime = create_mock_runtime(async_get_client=async_get_client)
|
||||
data_api_entry.runtime_data = runtime
|
||||
|
||||
coordinator = TibberDataAPICoordinator(hass, data_api_entry)
|
||||
|
||||
with pytest.raises(UpdateFailed):
|
||||
await coordinator._async_update_data()
|
||||
@@ -1,23 +1,13 @@
|
||||
"""Test the Tibber diagnostics."""
|
||||
"""Test the Netatmo diagnostics."""
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import aiohttp
|
||||
import pytest
|
||||
from syrupy.assertion import SnapshotAssertion
|
||||
import tibber
|
||||
from unittest.mock import patch
|
||||
|
||||
from homeassistant.components.recorder import Recorder
|
||||
from homeassistant.components.tibber.diagnostics import (
|
||||
async_get_config_entry_diagnostics,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from .conftest import create_mock_runtime, create_tibber_device
|
||||
from .test_common import mock_get_homes
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
from tests.components.diagnostics import get_diagnostics_for_config_entry
|
||||
from tests.typing import ClientSessionGenerator
|
||||
|
||||
@@ -26,93 +16,41 @@ async def test_entry_diagnostics(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
config_entry: MockConfigEntry,
|
||||
mock_tibber_setup: MagicMock,
|
||||
snapshot: SnapshotAssertion,
|
||||
config_entry,
|
||||
) -> None:
|
||||
"""Test config entry diagnostics."""
|
||||
tibber_mock = mock_tibber_setup
|
||||
tibber_mock.get_homes.return_value = []
|
||||
with patch(
|
||||
"tibber.Tibber.update_info",
|
||||
return_value=None,
|
||||
):
|
||||
assert await async_setup_component(hass, "tibber", {})
|
||||
|
||||
config_entry.runtime_data.data_api_coordinator.data = {}
|
||||
config_entry.runtime_data.data_api_coordinator.sensors_by_device = {}
|
||||
await hass.async_block_till_done()
|
||||
|
||||
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
|
||||
assert result == snapshot(name="empty")
|
||||
with patch(
|
||||
"tibber.Tibber.get_homes",
|
||||
return_value=[],
|
||||
):
|
||||
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
|
||||
|
||||
tibber_mock.get_homes.side_effect = mock_get_homes
|
||||
|
||||
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
|
||||
assert result == snapshot(name="with_homes")
|
||||
|
||||
|
||||
async def test_data_api_diagnostics_no_data(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
data_api_entry: MockConfigEntry,
|
||||
snapshot: SnapshotAssertion,
|
||||
) -> None:
|
||||
"""Test Data API diagnostics when coordinator has no data."""
|
||||
data_api_entry.runtime_data = create_mock_runtime()
|
||||
|
||||
result = await async_get_config_entry_diagnostics(hass, data_api_entry)
|
||||
assert result == snapshot
|
||||
|
||||
|
||||
async def test_data_api_diagnostics_with_devices(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
data_api_entry: MockConfigEntry,
|
||||
snapshot: SnapshotAssertion,
|
||||
) -> None:
|
||||
"""Test Data API diagnostics with successful device retrieval."""
|
||||
devices = {
|
||||
"device-1": create_tibber_device(
|
||||
device_id="device-1",
|
||||
name="Device 1",
|
||||
brand="Tibber",
|
||||
model="Test Model",
|
||||
),
|
||||
"device-2": create_tibber_device(
|
||||
device_id="device-2",
|
||||
name="Device 2",
|
||||
brand="Tibber",
|
||||
model="Test Model",
|
||||
),
|
||||
assert result == {
|
||||
"homes": [],
|
||||
}
|
||||
|
||||
data_api_entry.runtime_data = create_mock_runtime(coordinator_data=devices)
|
||||
with patch(
|
||||
"tibber.Tibber.get_homes",
|
||||
side_effect=mock_get_homes,
|
||||
):
|
||||
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
|
||||
|
||||
result = await async_get_config_entry_diagnostics(hass, data_api_entry)
|
||||
assert result == snapshot
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("exception", "expected_error"),
|
||||
[
|
||||
(ConfigEntryAuthFailed("Auth failed"), "Authentication failed"),
|
||||
(TimeoutError(), "Timeout error"),
|
||||
(aiohttp.ClientError("Connection error"), "Client error"),
|
||||
(tibber.InvalidLoginError(401), "Invalid login"),
|
||||
(tibber.RetryableHttpExceptionError(503), "Retryable HTTP error (503)"),
|
||||
(tibber.FatalHttpExceptionError(404), "Fatal HTTP error (404)"),
|
||||
],
|
||||
)
|
||||
async def test_data_api_diagnostics_exceptions(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
data_api_entry: MockConfigEntry,
|
||||
exception: Exception,
|
||||
expected_error: str,
|
||||
) -> None:
|
||||
"""Test Data API diagnostics with various exception scenarios."""
|
||||
runtime = create_mock_runtime()
|
||||
type(runtime.data_api_coordinator).data = property(
|
||||
lambda self: (_ for _ in ()).throw(exception)
|
||||
)
|
||||
data_api_entry.runtime_data = runtime
|
||||
|
||||
result = await async_get_config_entry_diagnostics(hass, data_api_entry)
|
||||
|
||||
assert result["error"] == expected_error
|
||||
assert result["devices"] == []
|
||||
assert result == {
|
||||
"homes": [
|
||||
{
|
||||
"last_data_timestamp": "2016-01-01T12:48:57",
|
||||
"has_active_subscription": True,
|
||||
"has_real_time_consumption": False,
|
||||
"last_cons_data_timestamp": "2016-01-01T12:44:57",
|
||||
"country": "NO",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
@@ -1,17 +1,11 @@
|
||||
"""Test loading of the Tibber config entry."""
|
||||
|
||||
from unittest.mock import ANY, AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from homeassistant.components.recorder import Recorder
|
||||
from homeassistant.components.tibber import DOMAIN, TibberRuntimeData, async_setup_entry
|
||||
from homeassistant.components.tibber import DOMAIN
|
||||
from homeassistant.config_entries import ConfigEntryState
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
|
||||
async def test_entry_unload(
|
||||
@@ -25,69 +19,3 @@ async def test_entry_unload(
|
||||
mock_tibber_setup.rt_disconnect.assert_called_once()
|
||||
await hass.async_block_till_done(wait_background_tasks=True)
|
||||
assert entry.state is ConfigEntryState.NOT_LOADED
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("recorder_mock")
|
||||
async def test_data_api_runtime_creates_client(hass: HomeAssistant) -> None:
|
||||
"""Ensure the data API runtime creates and caches the client."""
|
||||
session = MagicMock()
|
||||
session.async_ensure_token_valid = AsyncMock()
|
||||
session.token = {CONF_ACCESS_TOKEN: "access-token"}
|
||||
|
||||
runtime = TibberRuntimeData(
|
||||
session=session,
|
||||
tibber_connection=MagicMock(),
|
||||
)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.tibber.tibber_data_api.TibberDataAPI"
|
||||
) as mock_client_cls:
|
||||
mock_client = MagicMock()
|
||||
mock_client.set_access_token = MagicMock()
|
||||
mock_client_cls.return_value = mock_client
|
||||
|
||||
client = await runtime.async_get_client(hass)
|
||||
|
||||
mock_client_cls.assert_called_once_with("access-token", websession=ANY)
|
||||
session.async_ensure_token_valid.assert_awaited_once()
|
||||
mock_client.set_access_token.assert_called_once_with("access-token")
|
||||
assert client is mock_client
|
||||
|
||||
mock_client.set_access_token.reset_mock()
|
||||
session.async_ensure_token_valid.reset_mock()
|
||||
|
||||
cached_client = await runtime.async_get_client(hass)
|
||||
|
||||
mock_client_cls.assert_called_once()
|
||||
session.async_ensure_token_valid.assert_awaited_once()
|
||||
mock_client.set_access_token.assert_called_once_with("access-token")
|
||||
assert cached_client is client
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("recorder_mock")
|
||||
async def test_data_api_runtime_missing_token_raises(hass: HomeAssistant) -> None:
|
||||
"""Ensure missing tokens trigger reauthentication."""
|
||||
session = MagicMock()
|
||||
session.async_ensure_token_valid = AsyncMock()
|
||||
session.token = {}
|
||||
|
||||
runtime = TibberRuntimeData(
|
||||
session=session,
|
||||
tibber_connection=MagicMock(),
|
||||
)
|
||||
|
||||
with pytest.raises(ConfigEntryAuthFailed):
|
||||
await runtime.async_get_client(hass)
|
||||
session.async_ensure_token_valid.assert_awaited_once()
|
||||
|
||||
|
||||
async def test_setup_requires_data_api_reauth(hass: HomeAssistant) -> None:
|
||||
"""Ensure legacy entries trigger reauth to configure Data API."""
|
||||
entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_ACCESS_TOKEN: "legacy-token"},
|
||||
unique_id="legacy",
|
||||
)
|
||||
|
||||
with pytest.raises(ConfigEntryAuthFailed):
|
||||
await async_setup_entry(hass, entry)
|
||||
|
||||
Reference in New Issue
Block a user