mirror of
https://github.com/home-assistant/core.git
synced 2025-11-18 15:30:10 +00:00
Compare commits
18 Commits
setpoint_c
...
tibber_dat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c036128fa | ||
|
|
16d898cc8e | ||
|
|
a7225c7cd4 | ||
|
|
433a429c5a | ||
|
|
c4770ed423 | ||
|
|
df329fd273 | ||
|
|
6eb40574bc | ||
|
|
4fd1ef5483 | ||
|
|
7ec5d5305d | ||
|
|
7f31d2538e | ||
|
|
e1943307cf | ||
|
|
a06529d187 | ||
|
|
21554af6a1 | ||
|
|
b4aae93c45 | ||
|
|
1f9c244c5c | ||
|
|
9fa1b1b8df | ||
|
|
f3ac3ecf05 | ||
|
|
9477b2206b |
@@ -1,33 +1,83 @@
|
||||
"""Support for Tibber."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
import logging
|
||||
|
||||
import aiohttp
|
||||
from aiohttp.client_exceptions import ClientError, ClientResponseError
|
||||
import tibber
|
||||
from tibber import data_api as tibber_data_api
|
||||
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN, EVENT_HOMEASSISTANT_STOP, Platform
|
||||
from homeassistant.core import Event, HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryNotReady
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.helpers.config_entry_oauth2_flow import (
|
||||
ImplementationUnavailableError,
|
||||
OAuth2Session,
|
||||
async_get_config_entry_implementation,
|
||||
)
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
from homeassistant.util import dt as dt_util, ssl as ssl_util
|
||||
|
||||
from .const import DATA_HASS_CONFIG, DOMAIN
|
||||
from .const import (
|
||||
API_TYPE_DATA_API,
|
||||
API_TYPE_GRAPHQL,
|
||||
CONF_API_TYPE,
|
||||
DATA_HASS_CONFIG,
|
||||
DOMAIN,
|
||||
)
|
||||
from .services import async_setup_services
|
||||
|
||||
PLATFORMS = [Platform.NOTIFY, Platform.SENSOR]
|
||||
GRAPHQL_PLATFORMS = [Platform.NOTIFY, Platform.SENSOR]
|
||||
DATA_API_PLATFORMS = [Platform.SENSOR]
|
||||
|
||||
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class TibberGraphQLRuntimeData:
|
||||
"""Runtime data for GraphQL-based Tibber entries."""
|
||||
|
||||
tibber: tibber.Tibber
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class TibberDataAPIRuntimeData:
|
||||
"""Runtime data for Tibber Data API entries."""
|
||||
|
||||
session: OAuth2Session
|
||||
_client: tibber_data_api.TibberDataAPI | None = None
|
||||
|
||||
async def async_get_client(
|
||||
self, hass: HomeAssistant
|
||||
) -> tibber_data_api.TibberDataAPI:
|
||||
"""Return an authenticated Tibber Data API client."""
|
||||
await self.session.async_ensure_token_valid()
|
||||
token = self.session.token
|
||||
access_token = token.get(CONF_ACCESS_TOKEN)
|
||||
if not access_token:
|
||||
raise ConfigEntryAuthFailed("Access token missing from OAuth session")
|
||||
if self._client is None:
|
||||
self._client = tibber_data_api.TibberDataAPI(
|
||||
access_token,
|
||||
websession=async_get_clientsession(hass),
|
||||
)
|
||||
self._client.set_access_token(access_token)
|
||||
return self._client
|
||||
|
||||
|
||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Set up the Tibber component."""
|
||||
|
||||
hass.data[DATA_HASS_CONFIG] = config
|
||||
hass.data.setdefault(DOMAIN, {})
|
||||
|
||||
async_setup_services(hass)
|
||||
|
||||
@@ -37,45 +87,100 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Set up a config entry."""
|
||||
|
||||
hass.data.setdefault(DOMAIN, {})
|
||||
api_type = entry.data.get(CONF_API_TYPE, API_TYPE_GRAPHQL)
|
||||
|
||||
if api_type == API_TYPE_DATA_API:
|
||||
return await _async_setup_data_api_entry(hass, entry)
|
||||
|
||||
return await _async_setup_graphql_entry(hass, entry)
|
||||
|
||||
|
||||
async def _async_setup_graphql_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Set up the legacy GraphQL Tibber entry."""
|
||||
|
||||
tibber_connection = tibber.Tibber(
|
||||
access_token=entry.data[CONF_ACCESS_TOKEN],
|
||||
websession=async_get_clientsession(hass),
|
||||
time_zone=dt_util.get_default_time_zone(),
|
||||
ssl=ssl_util.get_default_context(),
|
||||
)
|
||||
hass.data[DOMAIN] = tibber_connection
|
||||
|
||||
async def _close(event: Event) -> None:
|
||||
runtime = TibberGraphQLRuntimeData(tibber_connection)
|
||||
entry.runtime_data = runtime
|
||||
hass.data[DOMAIN][API_TYPE_GRAPHQL] = runtime
|
||||
|
||||
async def _close(_event: Event) -> None:
|
||||
await tibber_connection.rt_disconnect()
|
||||
|
||||
entry.async_on_unload(hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _close))
|
||||
|
||||
try:
|
||||
await tibber_connection.update_info()
|
||||
|
||||
except (
|
||||
TimeoutError,
|
||||
aiohttp.ClientError,
|
||||
tibber.RetryableHttpExceptionError,
|
||||
) as err:
|
||||
raise ConfigEntryNotReady("Unable to connect") from err
|
||||
except tibber.InvalidLoginError as exp:
|
||||
_LOGGER.error("Failed to login. %s", exp)
|
||||
except tibber.InvalidLoginError as err:
|
||||
_LOGGER.error("Failed to login to Tibber GraphQL API: %s", err)
|
||||
return False
|
||||
except tibber.FatalHttpExceptionError:
|
||||
except tibber.FatalHttpExceptionError as err:
|
||||
_LOGGER.error("Fatal error communicating with Tibber GraphQL API: %s", err)
|
||||
return False
|
||||
|
||||
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
||||
await hass.config_entries.async_forward_entry_setups(entry, GRAPHQL_PLATFORMS)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
async def _async_setup_data_api_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Set up a Tibber Data API entry."""
|
||||
|
||||
try:
|
||||
implementation = await async_get_config_entry_implementation(hass, entry)
|
||||
except ImplementationUnavailableError as err:
|
||||
raise ConfigEntryNotReady(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="oauth2_implementation_unavailable",
|
||||
) from err
|
||||
|
||||
session = OAuth2Session(hass, entry, implementation)
|
||||
|
||||
try:
|
||||
await session.async_ensure_token_valid()
|
||||
except ClientResponseError as err:
|
||||
if 400 <= err.status < 500:
|
||||
raise ConfigEntryAuthFailed(
|
||||
"OAuth session is not valid, reauthentication required"
|
||||
) from err
|
||||
raise ConfigEntryNotReady from err
|
||||
except ClientError as err:
|
||||
raise ConfigEntryNotReady from err
|
||||
|
||||
runtime = TibberDataAPIRuntimeData(session=session)
|
||||
entry.runtime_data = runtime
|
||||
hass.data[DOMAIN][API_TYPE_DATA_API] = runtime
|
||||
|
||||
await hass.config_entries.async_forward_entry_setups(entry, DATA_API_PLATFORMS)
|
||||
return True
|
||||
|
||||
|
||||
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
|
||||
"""Unload a config entry."""
|
||||
api_type = config_entry.data.get(CONF_API_TYPE, API_TYPE_GRAPHQL)
|
||||
unload_ok = await hass.config_entries.async_unload_platforms(
|
||||
config_entry, PLATFORMS
|
||||
config_entry,
|
||||
GRAPHQL_PLATFORMS if api_type == API_TYPE_GRAPHQL else DATA_API_PLATFORMS,
|
||||
)
|
||||
|
||||
if unload_ok:
|
||||
tibber_connection = hass.data[DOMAIN]
|
||||
await tibber_connection.rt_disconnect()
|
||||
if api_type == API_TYPE_GRAPHQL:
|
||||
runtime = hass.data[DOMAIN].get(api_type)
|
||||
if runtime:
|
||||
tibber_connection = runtime.tibber
|
||||
await tibber_connection.rt_disconnect()
|
||||
|
||||
hass.data[DOMAIN].pop(api_type, None)
|
||||
return unload_ok
|
||||
|
||||
15
homeassistant/components/tibber/application_credentials.py
Normal file
15
homeassistant/components/tibber/application_credentials.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""Application credentials platform for Tibber."""
|
||||
|
||||
from homeassistant.components.application_credentials import AuthorizationServer
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
AUTHORIZE_URL = "https://thewall.tibber.com/connect/authorize"
|
||||
TOKEN_URL = "https://thewall.tibber.com/connect/token"
|
||||
|
||||
|
||||
async def async_get_authorization_server(hass: HomeAssistant) -> AuthorizationServer:
|
||||
"""Return authorization server for Tibber Data API."""
|
||||
return AuthorizationServer(
|
||||
authorize_url=AUTHORIZE_URL,
|
||||
token_url=TOKEN_URL,
|
||||
)
|
||||
@@ -2,36 +2,118 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
import tibber
|
||||
from tibber.data_api import TibberDataAPI
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN
|
||||
from homeassistant.config_entries import SOURCE_REAUTH, ConfigFlowResult
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_TOKEN
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.helpers.config_entry_oauth2_flow import (
|
||||
AbstractOAuth2FlowHandler,
|
||||
async_get_config_entry_implementation,
|
||||
async_get_implementations,
|
||||
)
|
||||
from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig
|
||||
|
||||
from .const import DOMAIN
|
||||
from .const import (
|
||||
API_TYPE_DATA_API,
|
||||
API_TYPE_GRAPHQL,
|
||||
CONF_API_TYPE,
|
||||
DATA_API_DEFAULT_SCOPES,
|
||||
DOMAIN,
|
||||
)
|
||||
|
||||
TYPE_SELECTOR = vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_API_TYPE, default=API_TYPE_GRAPHQL): SelectSelector(
|
||||
SelectSelectorConfig(
|
||||
options=[API_TYPE_GRAPHQL, API_TYPE_DATA_API],
|
||||
translation_key="api_type",
|
||||
)
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
GRAPHQL_SCHEMA = vol.Schema({vol.Required(CONF_ACCESS_TOKEN): str})
|
||||
|
||||
DATA_SCHEMA = vol.Schema({vol.Required(CONF_ACCESS_TOKEN): str})
|
||||
ERR_TIMEOUT = "timeout"
|
||||
ERR_CLIENT = "cannot_connect"
|
||||
ERR_TOKEN = "invalid_access_token"
|
||||
TOKEN_URL = "https://developer.tibber.com/settings/access-token"
|
||||
DATA_API_DOC_URL = "https://data-api.tibber.com/docs/auth/"
|
||||
APPLICATION_CREDENTIALS_DOC_URL = (
|
||||
"https://www.home-assistant.io/integrations/application_credentials/"
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TibberConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
class TibberConfigFlow(AbstractOAuth2FlowHandler, domain=DOMAIN):
|
||||
"""Handle a config flow for Tibber integration."""
|
||||
|
||||
DOMAIN = DOMAIN
|
||||
VERSION = 1
|
||||
MINOR_VERSION = 1
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize the config flow."""
|
||||
super().__init__()
|
||||
self._api_type: str | None = None
|
||||
self._data_api_home_ids: list[str] = []
|
||||
self._data_api_user_sub: str | None = None
|
||||
self._reauth_confirmed: bool = False
|
||||
|
||||
@property
|
||||
def logger(self) -> logging.Logger:
|
||||
"""Return the logger."""
|
||||
return _LOGGER
|
||||
|
||||
@property
|
||||
def extra_authorize_data(self) -> dict:
|
||||
"""Extra data appended to the authorize URL."""
|
||||
if self._api_type != API_TYPE_DATA_API:
|
||||
return super().extra_authorize_data
|
||||
return {
|
||||
**super().extra_authorize_data,
|
||||
"scope": " ".join(DATA_API_DEFAULT_SCOPES),
|
||||
}
|
||||
|
||||
async def async_step_user(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle the initial step."""
|
||||
|
||||
self._async_abort_entries_match()
|
||||
if user_input is None:
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=TYPE_SELECTOR,
|
||||
description_placeholders={"url": DATA_API_DOC_URL},
|
||||
)
|
||||
|
||||
self._api_type = user_input[CONF_API_TYPE]
|
||||
|
||||
if self._api_type == API_TYPE_GRAPHQL:
|
||||
return await self.async_step_graphql()
|
||||
|
||||
return await self.async_step_data_api()
|
||||
|
||||
async def async_step_graphql(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle GraphQL token based configuration."""
|
||||
|
||||
if self.source != SOURCE_REAUTH:
|
||||
for entry in self._async_current_entries(include_ignore=False):
|
||||
if entry.entry_id == self.context.get("entry_id"):
|
||||
continue
|
||||
if entry.data.get(CONF_API_TYPE, API_TYPE_GRAPHQL) == API_TYPE_GRAPHQL:
|
||||
return self.async_abort(reason="already_configured")
|
||||
|
||||
if user_input is not None:
|
||||
access_token = user_input[CONF_ACCESS_TOKEN].replace(" ", "")
|
||||
@@ -58,24 +140,146 @@ class TibberConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
|
||||
if errors:
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=DATA_SCHEMA,
|
||||
step_id="graphql",
|
||||
data_schema=GRAPHQL_SCHEMA,
|
||||
description_placeholders={"url": TOKEN_URL},
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
unique_id = tibber_connection.user_id
|
||||
await self.async_set_unique_id(unique_id)
|
||||
|
||||
if self.source == SOURCE_REAUTH:
|
||||
self._abort_if_unique_id_mismatch(reason="wrong_account")
|
||||
return self.async_update_reload_and_abort(
|
||||
self._get_reauth_entry(),
|
||||
data_updates={
|
||||
CONF_API_TYPE: API_TYPE_GRAPHQL,
|
||||
CONF_ACCESS_TOKEN: access_token,
|
||||
},
|
||||
title=tibber_connection.name,
|
||||
)
|
||||
|
||||
self._abort_if_unique_id_configured()
|
||||
|
||||
data = {
|
||||
CONF_API_TYPE: API_TYPE_GRAPHQL,
|
||||
CONF_ACCESS_TOKEN: access_token,
|
||||
}
|
||||
|
||||
return self.async_create_entry(
|
||||
title=tibber_connection.name,
|
||||
data={CONF_ACCESS_TOKEN: access_token},
|
||||
data=data,
|
||||
)
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=DATA_SCHEMA,
|
||||
step_id="graphql",
|
||||
data_schema=GRAPHQL_SCHEMA,
|
||||
description_placeholders={"url": TOKEN_URL},
|
||||
errors={},
|
||||
)
|
||||
|
||||
async def async_step_data_api(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle the Data API OAuth configuration."""
|
||||
|
||||
implementations = await async_get_implementations(self.hass, self.DOMAIN)
|
||||
if not implementations:
|
||||
return self.async_abort(
|
||||
reason="missing_credentials",
|
||||
description_placeholders={
|
||||
"application_credentials_url": APPLICATION_CREDENTIALS_DOC_URL,
|
||||
"data_api_url": DATA_API_DOC_URL,
|
||||
},
|
||||
)
|
||||
|
||||
if self.source != SOURCE_REAUTH:
|
||||
for entry in self._async_current_entries(include_ignore=False):
|
||||
if entry.entry_id == self.context.get("entry_id"):
|
||||
continue
|
||||
if entry.data.get(CONF_API_TYPE, API_TYPE_GRAPHQL) == API_TYPE_DATA_API:
|
||||
return self.async_abort(reason="already_configured")
|
||||
|
||||
return await self.async_step_pick_implementation(user_input)
|
||||
|
||||
async def async_oauth_create_entry(self, data: dict) -> ConfigFlowResult:
|
||||
"""Finalize the OAuth flow and create the config entry."""
|
||||
|
||||
assert self._api_type == API_TYPE_DATA_API
|
||||
|
||||
token: dict[str, Any] = data["token"]
|
||||
|
||||
client = TibberDataAPI(
|
||||
token[CONF_ACCESS_TOKEN],
|
||||
websession=async_get_clientsession(self.hass),
|
||||
)
|
||||
|
||||
try:
|
||||
userinfo = await client.get_userinfo()
|
||||
except (
|
||||
tibber.InvalidLoginError,
|
||||
tibber.FatalHttpExceptionError,
|
||||
) as err:
|
||||
self.logger.error("Authentication failed against Data API: %s", err)
|
||||
return self.async_abort(reason="oauth_invalid_token")
|
||||
except (aiohttp.ClientError, TimeoutError) as err:
|
||||
self.logger.error("Error retrieving homes via Data API: %s", err)
|
||||
return self.async_abort(reason="cannot_connect")
|
||||
|
||||
unique_id = userinfo["email"]
|
||||
title = userinfo["email"]
|
||||
await self.async_set_unique_id(unique_id)
|
||||
if self.source == SOURCE_REAUTH:
|
||||
reauth_entry = self._get_reauth_entry()
|
||||
self._abort_if_unique_id_mismatch(
|
||||
reason="wrong_account",
|
||||
description_placeholders={"email": reauth_entry.unique_id or ""},
|
||||
)
|
||||
return self.async_update_reload_and_abort(
|
||||
reauth_entry,
|
||||
data_updates={
|
||||
CONF_API_TYPE: API_TYPE_DATA_API,
|
||||
"auth_implementation": data["auth_implementation"],
|
||||
CONF_TOKEN: token,
|
||||
},
|
||||
title=title,
|
||||
)
|
||||
self._abort_if_unique_id_configured()
|
||||
|
||||
entry_data: dict[str, Any] = {
|
||||
CONF_API_TYPE: API_TYPE_DATA_API,
|
||||
"auth_implementation": data["auth_implementation"],
|
||||
CONF_TOKEN: token,
|
||||
}
|
||||
return self.async_create_entry(
|
||||
title=title,
|
||||
data=entry_data,
|
||||
)
|
||||
|
||||
async def async_step_reauth(
|
||||
self, entry_data: Mapping[str, Any]
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle reauthentication."""
|
||||
|
||||
api_type = entry_data.get(CONF_API_TYPE, API_TYPE_GRAPHQL)
|
||||
self._api_type = api_type
|
||||
|
||||
if api_type == API_TYPE_DATA_API:
|
||||
self.flow_impl = await async_get_config_entry_implementation(
|
||||
self.hass, self._get_reauth_entry()
|
||||
)
|
||||
return await self.async_step_auth()
|
||||
|
||||
self.context["title_placeholders"] = {"name": self._get_reauth_entry().title}
|
||||
return await self.async_step_reauth_confirm()
|
||||
|
||||
async def async_step_reauth_confirm(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Confirm the reauth dialog for GraphQL entries."""
|
||||
if user_input is None and not self._reauth_confirmed:
|
||||
self._reauth_confirmed = True
|
||||
return self.async_show_form(step_id="reauth_confirm")
|
||||
|
||||
return await self.async_step_graphql()
|
||||
|
||||
@@ -3,3 +3,19 @@
|
||||
DATA_HASS_CONFIG = "tibber_hass_config"
|
||||
DOMAIN = "tibber"
|
||||
MANUFACTURER = "Tibber"
|
||||
CONF_API_TYPE = "api_type"
|
||||
API_TYPE_GRAPHQL = "graphql"
|
||||
API_TYPE_DATA_API = "data_api"
|
||||
DATA_API_DEFAULT_SCOPES = [
|
||||
"openid",
|
||||
"profile",
|
||||
"email",
|
||||
"offline_access",
|
||||
"data-api-user-read",
|
||||
"data-api-chargers-read",
|
||||
"data-api-energy-systems-read",
|
||||
"data-api-homes-read",
|
||||
"data-api-thermostats-read",
|
||||
"data-api-vehicles-read",
|
||||
"data-api-inverters-read",
|
||||
]
|
||||
|
||||
@@ -4,9 +4,11 @@ from __future__ import annotations
|
||||
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
from typing import cast
|
||||
from typing import Any, cast
|
||||
|
||||
from aiohttp.client_exceptions import ClientError
|
||||
import tibber
|
||||
from tibber.data_api import TibberDataAPI, TibberDevice
|
||||
|
||||
from homeassistant.components.recorder import get_instance
|
||||
from homeassistant.components.recorder.models import (
|
||||
@@ -22,6 +24,7 @@ from homeassistant.components.recorder.statistics import (
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import UnitOfEnergy
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||
from homeassistant.util import dt as dt_util
|
||||
from homeassistant.util.unit_conversion import EnergyConverter
|
||||
@@ -187,3 +190,48 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
|
||||
unit_of_measurement=unit,
|
||||
)
|
||||
async_add_external_statistics(self.hass, metadata, statistics)
|
||||
|
||||
|
||||
class TibberDataAPICoordinator(DataUpdateCoordinator[dict[str, TibberDevice]]):
|
||||
"""Fetch and cache Tibber Data API device capabilities."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
entry: ConfigEntry,
|
||||
runtime_data: Any,
|
||||
) -> None:
|
||||
"""Initialize the coordinator."""
|
||||
super().__init__(
|
||||
hass,
|
||||
_LOGGER,
|
||||
name=f"{DOMAIN} Data API",
|
||||
update_interval=timedelta(minutes=1),
|
||||
config_entry=entry,
|
||||
)
|
||||
self._runtime_data = runtime_data
|
||||
|
||||
async def _async_get_client(self) -> TibberDataAPI:
|
||||
"""Get the Tibber Data API client with error handling."""
|
||||
try:
|
||||
return cast(
|
||||
TibberDataAPI,
|
||||
await self._runtime_data.async_get_client(self.hass),
|
||||
)
|
||||
except ConfigEntryAuthFailed:
|
||||
raise
|
||||
except (ClientError, TimeoutError, tibber.UserAgentMissingError) as err:
|
||||
raise UpdateFailed(
|
||||
f"Unable to create Tibber Data API client: {err}"
|
||||
) from err
|
||||
|
||||
async def _async_setup(self) -> None:
|
||||
"""Initial load of Tibber Data API devices."""
|
||||
client = await self._async_get_client()
|
||||
self.data = await client.get_all_devices()
|
||||
|
||||
async def _async_update_data(self) -> dict[str, TibberDevice]:
|
||||
"""Fetch the latest device capabilities from the Tibber Data API."""
|
||||
client = await self._async_get_client()
|
||||
devices: dict[str, TibberDevice] = await client.update_devices()
|
||||
return devices
|
||||
|
||||
@@ -4,29 +4,72 @@ from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
import tibber
|
||||
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
|
||||
from .const import DOMAIN
|
||||
from .const import API_TYPE_DATA_API, API_TYPE_GRAPHQL, CONF_API_TYPE, DOMAIN
|
||||
|
||||
|
||||
async def async_get_config_entry_diagnostics(
|
||||
hass: HomeAssistant, config_entry: ConfigEntry
|
||||
) -> dict[str, Any]:
|
||||
"""Return diagnostics for a config entry."""
|
||||
tibber_connection: tibber.Tibber = hass.data[DOMAIN]
|
||||
|
||||
api_type = config_entry.data.get(CONF_API_TYPE, API_TYPE_GRAPHQL)
|
||||
domain_data = hass.data.get(DOMAIN, {})
|
||||
|
||||
if api_type == API_TYPE_GRAPHQL:
|
||||
tibber_connection: tibber.Tibber = domain_data[API_TYPE_GRAPHQL].tibber
|
||||
return {
|
||||
"api_type": API_TYPE_GRAPHQL,
|
||||
"homes": [
|
||||
{
|
||||
"last_data_timestamp": home.last_data_timestamp,
|
||||
"has_active_subscription": home.has_active_subscription,
|
||||
"has_real_time_consumption": home.has_real_time_consumption,
|
||||
"last_cons_data_timestamp": home.last_cons_data_timestamp,
|
||||
"country": home.country,
|
||||
}
|
||||
for home in tibber_connection.get_homes(only_active=False)
|
||||
],
|
||||
}
|
||||
|
||||
runtime = domain_data.get(API_TYPE_DATA_API)
|
||||
if runtime is None:
|
||||
return {
|
||||
"api_type": API_TYPE_DATA_API,
|
||||
"devices": [],
|
||||
}
|
||||
|
||||
devices: dict[str, Any] = {}
|
||||
error: str | None = None
|
||||
try:
|
||||
devices = await (await runtime.async_get_client(hass)).get_all_devices()
|
||||
except (
|
||||
ConfigEntryAuthFailed,
|
||||
TimeoutError,
|
||||
aiohttp.ClientError,
|
||||
tibber.InvalidLoginError,
|
||||
tibber.RetryableHttpExceptionError,
|
||||
tibber.FatalHttpExceptionError,
|
||||
) as err:
|
||||
devices = {}
|
||||
error = f"Unexpected error: {type(err).__name__}"
|
||||
|
||||
return {
|
||||
"homes": [
|
||||
"api_type": API_TYPE_DATA_API,
|
||||
"error": error,
|
||||
"devices": [
|
||||
{
|
||||
"last_data_timestamp": home.last_data_timestamp,
|
||||
"has_active_subscription": home.has_active_subscription,
|
||||
"has_real_time_consumption": home.has_real_time_consumption,
|
||||
"last_cons_data_timestamp": home.last_cons_data_timestamp,
|
||||
"country": home.country,
|
||||
"id": device.id,
|
||||
"name": device.name,
|
||||
"brand": device.brand,
|
||||
"model": device.model,
|
||||
}
|
||||
for home in tibber_connection.get_homes(only_active=False)
|
||||
]
|
||||
for device in devices.values()
|
||||
],
|
||||
}
|
||||
|
||||
@@ -3,9 +3,9 @@
|
||||
"name": "Tibber",
|
||||
"codeowners": ["@danielhiversen"],
|
||||
"config_flow": true,
|
||||
"dependencies": ["recorder"],
|
||||
"dependencies": ["application_credentials", "recorder"],
|
||||
"documentation": "https://www.home-assistant.io/integrations/tibber",
|
||||
"iot_class": "cloud_polling",
|
||||
"loggers": ["tibber"],
|
||||
"requirements": ["pyTibber==0.32.2"]
|
||||
"requirements": ["pyTibber==0.33.1"]
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
|
||||
|
||||
from . import DOMAIN
|
||||
from .const import API_TYPE_GRAPHQL, DOMAIN
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
@@ -39,7 +39,7 @@ class TibberNotificationEntity(NotifyEntity):
|
||||
|
||||
async def async_send_message(self, message: str, title: str | None = None) -> None:
|
||||
"""Send a message to Tibber devices."""
|
||||
tibber_connection: Tibber = self.hass.data[DOMAIN]
|
||||
tibber_connection: Tibber = self.hass.data[DOMAIN][API_TYPE_GRAPHQL].tibber
|
||||
try:
|
||||
await tibber_connection.send_notification(
|
||||
title or ATTR_TITLE_DEFAULT, message
|
||||
|
||||
@@ -10,7 +10,8 @@ from random import randrange
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
import tibber
|
||||
from tibber import FatalHttpExceptionError, RetryableHttpExceptionError, TibberHome
|
||||
from tibber.data_api import TibberDevice
|
||||
|
||||
from homeassistant.components.sensor import (
|
||||
SensorDeviceClass,
|
||||
@@ -27,6 +28,7 @@ from homeassistant.const import (
|
||||
UnitOfElectricCurrent,
|
||||
UnitOfElectricPotential,
|
||||
UnitOfEnergy,
|
||||
UnitOfLength,
|
||||
UnitOfPower,
|
||||
)
|
||||
from homeassistant.core import Event, HomeAssistant, callback
|
||||
@@ -41,8 +43,14 @@ from homeassistant.helpers.update_coordinator import (
|
||||
)
|
||||
from homeassistant.util import Throttle, dt as dt_util
|
||||
|
||||
from .const import DOMAIN, MANUFACTURER
|
||||
from .coordinator import TibberDataCoordinator
|
||||
from .const import (
|
||||
API_TYPE_DATA_API,
|
||||
API_TYPE_GRAPHQL,
|
||||
CONF_API_TYPE,
|
||||
DOMAIN,
|
||||
MANUFACTURER,
|
||||
)
|
||||
from .coordinator import TibberDataAPICoordinator, TibberDataCoordinator
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@@ -260,6 +268,58 @@ SENSORS: tuple[SensorEntityDescription, ...] = (
|
||||
)
|
||||
|
||||
|
||||
DATA_API_SENSORS: tuple[SensorEntityDescription, ...] = (
|
||||
SensorEntityDescription(
|
||||
key="storage.stateOfCharge",
|
||||
translation_key="storage_state_of_charge",
|
||||
device_class=SensorDeviceClass.BATTERY,
|
||||
native_unit_of_measurement=PERCENTAGE,
|
||||
state_class=SensorStateClass.MEASUREMENT,
|
||||
),
|
||||
SensorEntityDescription(
|
||||
key="storage.targetStateOfCharge",
|
||||
translation_key="storage_target_state_of_charge",
|
||||
device_class=SensorDeviceClass.BATTERY,
|
||||
native_unit_of_measurement=PERCENTAGE,
|
||||
state_class=SensorStateClass.MEASUREMENT,
|
||||
),
|
||||
SensorEntityDescription(
|
||||
key="connector.status",
|
||||
translation_key="connector_status",
|
||||
device_class=SensorDeviceClass.ENUM,
|
||||
options=["connected", "disconnected", "unknown"],
|
||||
),
|
||||
SensorEntityDescription(
|
||||
key="charging.status",
|
||||
translation_key="charging_status",
|
||||
device_class=SensorDeviceClass.ENUM,
|
||||
options=["charging", "idle", "unknown"],
|
||||
),
|
||||
SensorEntityDescription(
|
||||
key="range.remaining",
|
||||
translation_key="range_remaining",
|
||||
device_class=SensorDeviceClass.DISTANCE,
|
||||
native_unit_of_measurement=UnitOfLength.KILOMETERS,
|
||||
state_class=SensorStateClass.MEASUREMENT,
|
||||
suggested_display_precision=1,
|
||||
),
|
||||
SensorEntityDescription(
|
||||
key="charging.current.max",
|
||||
translation_key="charging_current_max",
|
||||
device_class=SensorDeviceClass.CURRENT,
|
||||
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
|
||||
state_class=SensorStateClass.MEASUREMENT,
|
||||
),
|
||||
SensorEntityDescription(
|
||||
key="charging.current.offlineFallback",
|
||||
translation_key="charging_current_offline_fallback",
|
||||
device_class=SensorDeviceClass.CURRENT,
|
||||
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
|
||||
state_class=SensorStateClass.MEASUREMENT,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant,
|
||||
entry: ConfigEntry,
|
||||
@@ -267,7 +327,11 @@ async def async_setup_entry(
|
||||
) -> None:
|
||||
"""Set up the Tibber sensor."""
|
||||
|
||||
tibber_connection = hass.data[DOMAIN]
|
||||
if entry.data.get(CONF_API_TYPE, API_TYPE_GRAPHQL) == API_TYPE_DATA_API:
|
||||
await _async_setup_data_api_sensors(hass, entry, async_add_entities)
|
||||
return
|
||||
|
||||
tibber_connection = hass.data[DOMAIN][API_TYPE_GRAPHQL].tibber
|
||||
|
||||
entity_registry = er.async_get(hass)
|
||||
device_registry = dr.async_get(hass)
|
||||
@@ -280,7 +344,11 @@ async def async_setup_entry(
|
||||
except TimeoutError as err:
|
||||
_LOGGER.error("Timeout connecting to Tibber home: %s ", err)
|
||||
raise PlatformNotReady from err
|
||||
except (tibber.RetryableHttpExceptionError, aiohttp.ClientError) as err:
|
||||
except (
|
||||
RetryableHttpExceptionError,
|
||||
FatalHttpExceptionError,
|
||||
aiohttp.ClientError,
|
||||
) as err:
|
||||
_LOGGER.error("Error connecting to Tibber home: %s ", err)
|
||||
raise PlatformNotReady from err
|
||||
|
||||
@@ -328,14 +396,94 @@ async def async_setup_entry(
|
||||
async_add_entities(entities, True)
|
||||
|
||||
|
||||
async def _async_setup_data_api_sensors(
|
||||
hass: HomeAssistant,
|
||||
entry: ConfigEntry,
|
||||
async_add_entities: AddConfigEntryEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up sensors backed by the Tibber Data API."""
|
||||
|
||||
domain_data = hass.data.get(DOMAIN, {})
|
||||
runtime = domain_data[API_TYPE_DATA_API]
|
||||
|
||||
coordinator = TibberDataAPICoordinator(hass, entry, runtime)
|
||||
|
||||
await coordinator.async_config_entry_first_refresh()
|
||||
|
||||
entities: list[TibberDataAPISensor] = []
|
||||
api_sensors = {sensor.key: sensor for sensor in DATA_API_SENSORS}
|
||||
|
||||
for device in coordinator.data.values():
|
||||
for sensor in device.sensors:
|
||||
description: SensorEntityDescription | None = api_sensors.get(sensor.id)
|
||||
if description is None:
|
||||
_LOGGER.error("Sensor %s not found", sensor)
|
||||
continue
|
||||
entities.append(
|
||||
TibberDataAPISensor(
|
||||
coordinator, device, description, sensor.description
|
||||
)
|
||||
)
|
||||
async_add_entities(entities)
|
||||
|
||||
|
||||
class TibberDataAPISensor(CoordinatorEntity[TibberDataAPICoordinator], SensorEntity):
|
||||
"""Representation of a Tibber Data API capability sensor."""
|
||||
|
||||
_attr_has_entity_name = True
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: TibberDataAPICoordinator,
|
||||
device: TibberDevice,
|
||||
entity_description: SensorEntityDescription,
|
||||
name: str,
|
||||
) -> None:
|
||||
"""Initialize the sensor."""
|
||||
super().__init__(coordinator)
|
||||
|
||||
self._device_id: str = device.id
|
||||
self.entity_description = entity_description
|
||||
self._attr_name = name
|
||||
|
||||
self._attr_unique_id = f"{device.external_id}_{self.entity_description.key}"
|
||||
|
||||
self._attr_device_info = DeviceInfo(
|
||||
identifiers={(DOMAIN, device.external_id)},
|
||||
name=device.name,
|
||||
manufacturer=device.brand,
|
||||
model=device.model,
|
||||
)
|
||||
|
||||
@property
|
||||
def native_value(
|
||||
self,
|
||||
) -> StateType:
|
||||
"""Return the value reported by the device."""
|
||||
device = self.coordinator.data.get(self._device_id)
|
||||
if device is None:
|
||||
return None
|
||||
|
||||
for sensor in device.sensors:
|
||||
if sensor.id == self.entity_description.key:
|
||||
return sensor.value
|
||||
return None
|
||||
|
||||
@property
|
||||
def available(self) -> bool:
|
||||
"""Return whether the sensor is available."""
|
||||
device = self.coordinator.data.get(self._device_id)
|
||||
if device is None:
|
||||
return False
|
||||
return self.native_value is not None
|
||||
|
||||
|
||||
class TibberSensor(SensorEntity):
|
||||
"""Representation of a generic Tibber sensor."""
|
||||
|
||||
_attr_has_entity_name = True
|
||||
|
||||
def __init__(
|
||||
self, *args: Any, tibber_home: tibber.TibberHome, **kwargs: Any
|
||||
) -> None:
|
||||
def __init__(self, *args: Any, tibber_home: TibberHome, **kwargs: Any) -> None:
|
||||
"""Initialize the sensor."""
|
||||
super().__init__(*args, **kwargs)
|
||||
self._tibber_home = tibber_home
|
||||
@@ -366,7 +514,7 @@ class TibberSensorElPrice(TibberSensor):
|
||||
_attr_state_class = SensorStateClass.MEASUREMENT
|
||||
_attr_translation_key = "electricity_price"
|
||||
|
||||
def __init__(self, tibber_home: tibber.TibberHome) -> None:
|
||||
def __init__(self, tibber_home: TibberHome) -> None:
|
||||
"""Initialize the sensor."""
|
||||
super().__init__(tibber_home=tibber_home)
|
||||
self._last_updated: datetime.datetime | None = None
|
||||
@@ -443,7 +591,7 @@ class TibberDataSensor(TibberSensor, CoordinatorEntity[TibberDataCoordinator]):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
tibber_home: tibber.TibberHome,
|
||||
tibber_home: TibberHome,
|
||||
coordinator: TibberDataCoordinator,
|
||||
entity_description: SensorEntityDescription,
|
||||
) -> None:
|
||||
@@ -470,7 +618,7 @@ class TibberSensorRT(TibberSensor, CoordinatorEntity["TibberRtDataCoordinator"])
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
tibber_home: tibber.TibberHome,
|
||||
tibber_home: TibberHome,
|
||||
description: SensorEntityDescription,
|
||||
initial_state: float,
|
||||
coordinator: TibberRtDataCoordinator,
|
||||
@@ -532,7 +680,7 @@ class TibberRtEntityCreator:
|
||||
def __init__(
|
||||
self,
|
||||
async_add_entities: AddConfigEntryEntitiesCallback,
|
||||
tibber_home: tibber.TibberHome,
|
||||
tibber_home: TibberHome,
|
||||
entity_registry: er.EntityRegistry,
|
||||
) -> None:
|
||||
"""Initialize the data handler."""
|
||||
@@ -618,7 +766,7 @@ class TibberRtDataCoordinator(DataUpdateCoordinator): # pylint: disable=hass-en
|
||||
hass: HomeAssistant,
|
||||
config_entry: ConfigEntry,
|
||||
add_sensor_callback: Callable[[TibberRtDataCoordinator, Any], None],
|
||||
tibber_home: tibber.TibberHome,
|
||||
tibber_home: TibberHome,
|
||||
) -> None:
|
||||
"""Initialize the data handler."""
|
||||
self._add_sensor_callback = add_sensor_callback
|
||||
|
||||
@@ -18,7 +18,7 @@ from homeassistant.core import (
|
||||
from homeassistant.exceptions import ServiceValidationError
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from .const import DOMAIN
|
||||
from .const import API_TYPE_GRAPHQL, DOMAIN
|
||||
|
||||
PRICE_SERVICE_NAME = "get_prices"
|
||||
ATTR_START: Final = "start"
|
||||
@@ -33,7 +33,15 @@ SERVICE_SCHEMA: Final = vol.Schema(
|
||||
|
||||
|
||||
async def __get_prices(call: ServiceCall) -> ServiceResponse:
|
||||
tibber_connection = call.hass.data[DOMAIN]
|
||||
domain_data = call.hass.data.get(DOMAIN, {})
|
||||
runtime = domain_data.get(API_TYPE_GRAPHQL)
|
||||
if runtime is None:
|
||||
raise ServiceValidationError(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="graphql_required",
|
||||
)
|
||||
|
||||
tibber_connection = runtime.tibber
|
||||
|
||||
start = __get_date(call.data.get(ATTR_START), "start")
|
||||
end = __get_date(call.data.get(ATTR_END), "end")
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
{
|
||||
"config": {
|
||||
"abort": {
|
||||
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]"
|
||||
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]",
|
||||
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
|
||||
"missing_configuration": "[%key:common::config_flow::abort::oauth2_missing_configuration%]",
|
||||
"missing_credentials": "Add Tibber Data API application credentials under application credentials before continuing. See {application_credentials_url} for guidance and {data_api_url} for API documentation.",
|
||||
"oauth_invalid_token": "[%key:common::config_flow::abort::oauth2_error%]",
|
||||
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]",
|
||||
"wrong_account": "The connected account does not match {email}. Sign in with the same Tibber account and try again."
|
||||
},
|
||||
"error": {
|
||||
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
|
||||
@@ -9,11 +15,21 @@
|
||||
"timeout": "[%key:common::config_flow::error::timeout_connect%]"
|
||||
},
|
||||
"step": {
|
||||
"user": {
|
||||
"graphql": {
|
||||
"data": {
|
||||
"access_token": "[%key:common::config_flow::data::access_token%]"
|
||||
},
|
||||
"description": "Enter your access token from {url}"
|
||||
},
|
||||
"reauth_confirm": {
|
||||
"description": "Reconnect your Tibber account to refresh access.",
|
||||
"title": "[%key:common::config_flow::title::reauth%]"
|
||||
},
|
||||
"user": {
|
||||
"data": {
|
||||
"api_type": "API type"
|
||||
},
|
||||
"description": "Select which Tibber API you want to configure. See {url} for documentation."
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -40,6 +56,37 @@
|
||||
"average_power": {
|
||||
"name": "Average power"
|
||||
},
|
||||
"battery_battery_power": {
|
||||
"name": "Battery power"
|
||||
},
|
||||
"battery_battery_state_of_charge": {
|
||||
"name": "Battery state of charge"
|
||||
},
|
||||
"battery_stored_energy": {
|
||||
"name": "Stored energy"
|
||||
},
|
||||
"charging_current_max": {
|
||||
"name": "Maximum charging current"
|
||||
},
|
||||
"charging_current_offline_fallback": {
|
||||
"name": "Offline fallback charging current"
|
||||
},
|
||||
"charging_status": {
|
||||
"name": "Charging status",
|
||||
"state": {
|
||||
"charging": "Charging",
|
||||
"idle": "Idle",
|
||||
"unknown": "Unknown"
|
||||
}
|
||||
},
|
||||
"connector_status": {
|
||||
"name": "Connector status",
|
||||
"state": {
|
||||
"connected": "Connected",
|
||||
"disconnected": "Disconnected",
|
||||
"unknown": "Unknown"
|
||||
}
|
||||
},
|
||||
"current_l1": {
|
||||
"name": "Current L1"
|
||||
},
|
||||
@@ -55,6 +102,30 @@
|
||||
"estimated_hour_consumption": {
|
||||
"name": "Estimated consumption current hour"
|
||||
},
|
||||
"ev_charger_charge_current": {
|
||||
"name": "Charge current"
|
||||
},
|
||||
"ev_charger_charging_state": {
|
||||
"name": "Charging state"
|
||||
},
|
||||
"ev_charger_power": {
|
||||
"name": "Charging power"
|
||||
},
|
||||
"ev_charger_session_energy": {
|
||||
"name": "Session energy"
|
||||
},
|
||||
"ev_charger_total_energy": {
|
||||
"name": "Total energy"
|
||||
},
|
||||
"heat_pump_measured_temperature": {
|
||||
"name": "Measured temperature"
|
||||
},
|
||||
"heat_pump_operation_mode": {
|
||||
"name": "Operation mode"
|
||||
},
|
||||
"heat_pump_target_temperature": {
|
||||
"name": "Target temperature"
|
||||
},
|
||||
"last_meter_consumption": {
|
||||
"name": "Last meter consumption"
|
||||
},
|
||||
@@ -88,9 +159,33 @@
|
||||
"power_production": {
|
||||
"name": "Power production"
|
||||
},
|
||||
"range_remaining": {
|
||||
"name": "Remaining range"
|
||||
},
|
||||
"signal_strength": {
|
||||
"name": "Signal strength"
|
||||
},
|
||||
"solar_power": {
|
||||
"name": "Solar power"
|
||||
},
|
||||
"solar_power_production": {
|
||||
"name": "Power production"
|
||||
},
|
||||
"storage_state_of_charge": {
|
||||
"name": "Storage state of charge"
|
||||
},
|
||||
"storage_target_state_of_charge": {
|
||||
"name": "Storage target state of charge"
|
||||
},
|
||||
"thermostat_measured_temperature": {
|
||||
"name": "Measured temperature"
|
||||
},
|
||||
"thermostat_operation_mode": {
|
||||
"name": "Operation mode"
|
||||
},
|
||||
"thermostat_target_temperature": {
|
||||
"name": "Target temperature"
|
||||
},
|
||||
"voltage_phase1": {
|
||||
"name": "Voltage phase1"
|
||||
},
|
||||
@@ -103,13 +198,27 @@
|
||||
}
|
||||
},
|
||||
"exceptions": {
|
||||
"graphql_required": {
|
||||
"message": "Configure the Tibber GraphQL API before calling this service."
|
||||
},
|
||||
"invalid_date": {
|
||||
"message": "Invalid datetime provided {date}"
|
||||
},
|
||||
"oauth2_implementation_unavailable": {
|
||||
"message": "[%key:common::exceptions::oauth2_implementation_unavailable::message%]"
|
||||
},
|
||||
"send_message_timeout": {
|
||||
"message": "Timeout sending message with Tibber"
|
||||
}
|
||||
},
|
||||
"selector": {
|
||||
"api_type": {
|
||||
"options": {
|
||||
"data_api": "Data API (OAuth2)",
|
||||
"graphql": "GraphQL API (access token)"
|
||||
}
|
||||
}
|
||||
},
|
||||
"services": {
|
||||
"get_prices": {
|
||||
"description": "Fetches hourly energy prices including price level.",
|
||||
|
||||
@@ -37,6 +37,7 @@ APPLICATION_CREDENTIALS = [
|
||||
"smartthings",
|
||||
"spotify",
|
||||
"tesla_fleet",
|
||||
"tibber",
|
||||
"twitch",
|
||||
"volvo",
|
||||
"weheat",
|
||||
|
||||
2
requirements_all.txt
generated
2
requirements_all.txt
generated
@@ -1839,7 +1839,7 @@ pyRFXtrx==0.31.1
|
||||
pySDCP==1
|
||||
|
||||
# homeassistant.components.tibber
|
||||
pyTibber==0.32.2
|
||||
pyTibber==0.33.1
|
||||
|
||||
# homeassistant.components.dlink
|
||||
pyW215==0.8.0
|
||||
|
||||
2
requirements_test_all.txt
generated
2
requirements_test_all.txt
generated
@@ -1549,7 +1549,7 @@ pyHomee==1.3.8
|
||||
pyRFXtrx==0.31.1
|
||||
|
||||
# homeassistant.components.tibber
|
||||
pyTibber==0.32.2
|
||||
pyTibber==0.33.1
|
||||
|
||||
# homeassistant.components.dlink
|
||||
pyW215==0.8.0
|
||||
|
||||
@@ -4,21 +4,67 @@ from collections.abc import AsyncGenerator
|
||||
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
|
||||
|
||||
import pytest
|
||||
import tibber
|
||||
|
||||
from homeassistant.components.application_credentials import (
|
||||
ClientCredential,
|
||||
async_import_client_credential,
|
||||
)
|
||||
from homeassistant.components.recorder import Recorder
|
||||
from homeassistant.components.tibber.const import DOMAIN
|
||||
from homeassistant.components.tibber.const import (
|
||||
API_TYPE_GRAPHQL,
|
||||
CONF_API_TYPE,
|
||||
DOMAIN,
|
||||
)
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
|
||||
def create_tibber_device(
|
||||
device_id: str = "device-id",
|
||||
external_id: str = "external-id",
|
||||
name: str = "Test Device",
|
||||
brand: str = "Tibber",
|
||||
model: str = "Gen1",
|
||||
value: float | None = 72.0,
|
||||
home_id: str = "home-id",
|
||||
) -> tibber.data_api.TibberDevice:
|
||||
"""Create a fake Tibber Data API device."""
|
||||
device_data = {
|
||||
"id": device_id,
|
||||
"externalId": external_id,
|
||||
"info": {
|
||||
"name": name,
|
||||
"brand": brand,
|
||||
"model": model,
|
||||
},
|
||||
"capabilities": [
|
||||
{
|
||||
"id": "storage.stateOfCharge",
|
||||
"value": value,
|
||||
"description": "State of charge",
|
||||
"unit": "%",
|
||||
},
|
||||
{
|
||||
"id": "unknown.sensor.id",
|
||||
"value": None,
|
||||
"description": "Unknown",
|
||||
"unit": "",
|
||||
},
|
||||
],
|
||||
}
|
||||
return tibber.data_api.TibberDevice(device_data, home_id=home_id)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def config_entry(hass: HomeAssistant) -> MockConfigEntry:
|
||||
"""Tibber config entry."""
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_ACCESS_TOKEN: "token"},
|
||||
data={CONF_API_TYPE: API_TYPE_GRAPHQL, CONF_ACCESS_TOKEN: "token"},
|
||||
unique_id="tibber",
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
@@ -44,3 +90,15 @@ async def mock_tibber_setup(
|
||||
await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
yield tibber_mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def setup_credentials(recorder_mock: Recorder, hass: HomeAssistant) -> None:
|
||||
"""Set up application credentials for the OAuth flow."""
|
||||
assert await async_setup_component(hass, "application_credentials", {})
|
||||
await async_import_client_credential(
|
||||
hass,
|
||||
DOMAIN,
|
||||
ClientCredential("test-client-id", "test-client-secret"),
|
||||
DOMAIN,
|
||||
)
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
"""Tests for Tibber config flow."""
|
||||
|
||||
from asyncio import TimeoutError
|
||||
from http import HTTPStatus
|
||||
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
from aiohttp import ClientError
|
||||
import pytest
|
||||
@@ -13,16 +15,29 @@ from tibber import (
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components.recorder import Recorder
|
||||
from homeassistant.components.tibber.application_credentials import TOKEN_URL
|
||||
from homeassistant.components.tibber.config_flow import (
|
||||
APPLICATION_CREDENTIALS_DOC_URL,
|
||||
DATA_API_DEFAULT_SCOPES,
|
||||
DATA_API_DOC_URL,
|
||||
ERR_CLIENT,
|
||||
ERR_TIMEOUT,
|
||||
ERR_TOKEN,
|
||||
)
|
||||
from homeassistant.components.tibber.const import DOMAIN
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN
|
||||
from homeassistant.components.tibber.const import (
|
||||
API_TYPE_DATA_API,
|
||||
API_TYPE_GRAPHQL,
|
||||
CONF_API_TYPE,
|
||||
DOMAIN,
|
||||
)
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_TOKEN
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.data_entry_flow import FlowResultType
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
from tests.test_util.aiohttp import AiohttpClientMocker
|
||||
from tests.typing import ClientSessionGenerator
|
||||
|
||||
|
||||
@pytest.fixture(name="tibber_setup", autouse=True)
|
||||
def tibber_setup_fixture():
|
||||
@@ -43,10 +58,19 @@ async def test_show_config_form(recorder_mock: Recorder, hass: HomeAssistant) ->
|
||||
|
||||
async def test_create_entry(recorder_mock: Recorder, hass: HomeAssistant) -> None:
|
||||
"""Test create entry from user input."""
|
||||
test_data = {
|
||||
CONF_ACCESS_TOKEN: "valid",
|
||||
}
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_API_TYPE: API_TYPE_GRAPHQL}
|
||||
)
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "graphql"
|
||||
|
||||
test_data = {CONF_ACCESS_TOKEN: "valid"}
|
||||
unique_user_id = "unique_user_id"
|
||||
title = "title"
|
||||
|
||||
@@ -56,13 +80,16 @@ async def test_create_entry(recorder_mock: Recorder, hass: HomeAssistant) -> Non
|
||||
type(tibber_mock).name = PropertyMock(return_value=title)
|
||||
|
||||
with patch("tibber.Tibber", return_value=tibber_mock):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=test_data
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], test_data
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.CREATE_ENTRY
|
||||
assert result["title"] == title
|
||||
assert result["data"] == test_data
|
||||
assert result["data"] == {
|
||||
CONF_API_TYPE: API_TYPE_GRAPHQL,
|
||||
CONF_ACCESS_TOKEN: "valid",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -79,9 +106,12 @@ async def test_create_entry_exceptions(
|
||||
recorder_mock: Recorder, hass: HomeAssistant, exception, expected_error
|
||||
) -> None:
|
||||
"""Test create entry from user input."""
|
||||
test_data = {
|
||||
CONF_ACCESS_TOKEN: "valid",
|
||||
}
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_API_TYPE: API_TYPE_GRAPHQL}
|
||||
)
|
||||
|
||||
unique_user_id = "unique_user_id"
|
||||
title = "title"
|
||||
@@ -92,8 +122,8 @@ async def test_create_entry_exceptions(
|
||||
type(tibber_mock).name = PropertyMock(return_value=title)
|
||||
|
||||
with patch("tibber.Tibber", return_value=tibber_mock):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=test_data
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_ACCESS_TOKEN: "valid"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
@@ -104,14 +134,359 @@ async def test_flow_entry_already_exists(
|
||||
recorder_mock: Recorder, hass: HomeAssistant, config_entry
|
||||
) -> None:
|
||||
"""Test user input for config_entry that already exists."""
|
||||
test_data = {
|
||||
CONF_ACCESS_TOKEN: "valid",
|
||||
}
|
||||
|
||||
with patch("tibber.Tibber.update_info", return_value=None):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=test_data
|
||||
)
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_API_TYPE: API_TYPE_GRAPHQL}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "already_configured"
|
||||
|
||||
|
||||
async def test_data_api_requires_credentials(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Test the data API path aborts when no credentials are configured."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_API_TYPE: API_TYPE_DATA_API}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "missing_credentials"
|
||||
assert result["description_placeholders"] == {
|
||||
"application_credentials_url": APPLICATION_CREDENTIALS_DOC_URL,
|
||||
"data_api_url": DATA_API_DOC_URL,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_credentials", "current_request_with_host")
|
||||
async def test_data_api_extra_authorize_scope(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Ensure the Data API flow requests the default scopes."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_API_TYPE: API_TYPE_DATA_API}
|
||||
)
|
||||
|
||||
handler = hass.config_entries.flow._progress[result["flow_id"]]
|
||||
|
||||
assert handler.extra_authorize_data["scope"] == " ".join(DATA_API_DEFAULT_SCOPES)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_credentials", "current_request_with_host")
|
||||
async def test_data_api_full_flow(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
hass_client_no_auth: ClientSessionGenerator,
|
||||
aioclient_mock: AiohttpClientMocker,
|
||||
) -> None:
|
||||
"""Test configuring the Data API through OAuth."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_API_TYPE: API_TYPE_DATA_API}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.EXTERNAL_STEP
|
||||
authorize_url = result["url"]
|
||||
state = parse_qs(urlparse(authorize_url).query)["state"][0]
|
||||
|
||||
client = await hass_client_no_auth()
|
||||
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
|
||||
assert resp.status == HTTPStatus.OK
|
||||
|
||||
aioclient_mock.post(
|
||||
TOKEN_URL,
|
||||
json={
|
||||
"access_token": "mock-access-token",
|
||||
"refresh_token": "mock-refresh-token",
|
||||
"token_type": "bearer",
|
||||
"expires_in": 3600,
|
||||
},
|
||||
)
|
||||
|
||||
data_api_client = MagicMock()
|
||||
data_api_client.get_userinfo = AsyncMock(
|
||||
return_value={"email": "mock-user@example.com"}
|
||||
)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.tibber.config_flow.TibberDataAPI",
|
||||
return_value=data_api_client,
|
||||
create=True,
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] is FlowResultType.CREATE_ENTRY
|
||||
assert result["data"][CONF_API_TYPE] == API_TYPE_DATA_API
|
||||
assert result["data"][CONF_TOKEN]["access_token"] == "mock-access-token"
|
||||
assert result["data"]["auth_implementation"] == DOMAIN
|
||||
assert result["title"] == "mock-user@example.com"
|
||||
assert result["result"].unique_id == "mock-user@example.com"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_credentials", "current_request_with_host")
|
||||
async def test_data_api_oauth_cannot_connect_abort(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
hass_client_no_auth: ClientSessionGenerator,
|
||||
aioclient_mock: AiohttpClientMocker,
|
||||
) -> None:
|
||||
"""Abort the Data API flow when userinfo cannot be retrieved."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_API_TYPE: API_TYPE_DATA_API}
|
||||
)
|
||||
|
||||
authorize_url = result["url"]
|
||||
state = parse_qs(urlparse(authorize_url).query)["state"][0]
|
||||
|
||||
client = await hass_client_no_auth()
|
||||
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
|
||||
assert resp.status == HTTPStatus.OK
|
||||
|
||||
aioclient_mock.post(
|
||||
TOKEN_URL,
|
||||
json={
|
||||
"access_token": "mock-access-token",
|
||||
"refresh_token": "mock-refresh-token",
|
||||
"token_type": "bearer",
|
||||
"expires_in": 3600,
|
||||
},
|
||||
)
|
||||
|
||||
data_api_client = MagicMock()
|
||||
data_api_client.get_userinfo = AsyncMock(side_effect=ClientError("boom"))
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.tibber.config_flow.TibberDataAPI",
|
||||
return_value=data_api_client,
|
||||
create=True,
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "cannot_connect"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_credentials")
|
||||
async def test_data_api_abort_when_already_configured(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Ensure only a single Data API entry can be configured."""
|
||||
existing_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={
|
||||
CONF_API_TYPE: API_TYPE_DATA_API,
|
||||
"auth_implementation": DOMAIN,
|
||||
CONF_TOKEN: {"access_token": "existing"},
|
||||
},
|
||||
unique_id="existing@example.com",
|
||||
title="existing@example.com",
|
||||
)
|
||||
existing_entry.add_to_hass(hass)
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_API_TYPE: API_TYPE_DATA_API}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "already_configured"
|
||||
|
||||
|
||||
async def test_graphql_reauth_updates_entry(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Test GraphQL reauth refreshes credentials."""
|
||||
existing_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={
|
||||
CONF_API_TYPE: API_TYPE_GRAPHQL,
|
||||
CONF_ACCESS_TOKEN: "old-token",
|
||||
},
|
||||
unique_id="user-123",
|
||||
title="Old title",
|
||||
)
|
||||
existing_entry.add_to_hass(hass)
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={
|
||||
"source": config_entries.SOURCE_REAUTH,
|
||||
"entry_id": existing_entry.entry_id,
|
||||
},
|
||||
data=existing_entry.data,
|
||||
)
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "reauth_confirm"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "graphql"
|
||||
|
||||
tibber_mock = MagicMock()
|
||||
type(tibber_mock).update_info = AsyncMock(return_value=True)
|
||||
type(tibber_mock).user_id = PropertyMock(return_value="user-123")
|
||||
type(tibber_mock).name = PropertyMock(return_value="New title")
|
||||
|
||||
with patch("tibber.Tibber", return_value=tibber_mock):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {CONF_ACCESS_TOKEN: "new-token"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "reauth_successful"
|
||||
updated_entry = hass.config_entries.async_get_entry(existing_entry.entry_id)
|
||||
assert updated_entry is not None
|
||||
assert updated_entry.data[CONF_ACCESS_TOKEN] == "new-token"
|
||||
assert updated_entry.data[CONF_API_TYPE] == API_TYPE_GRAPHQL
|
||||
assert updated_entry.title == "New title"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_credentials", "current_request_with_host")
|
||||
async def test_data_api_reauth_updates_entry(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
hass_client_no_auth: ClientSessionGenerator,
|
||||
aioclient_mock: AiohttpClientMocker,
|
||||
) -> None:
|
||||
"""Test Data API reauth refreshes credentials."""
|
||||
existing_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={
|
||||
CONF_API_TYPE: API_TYPE_DATA_API,
|
||||
"auth_implementation": DOMAIN,
|
||||
CONF_TOKEN: {
|
||||
"access_token": "old-access-token",
|
||||
"refresh_token": "old-refresh-token",
|
||||
},
|
||||
},
|
||||
unique_id="old@example.com",
|
||||
title="old@example.com",
|
||||
)
|
||||
existing_entry.add_to_hass(hass)
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={
|
||||
"source": config_entries.SOURCE_REAUTH,
|
||||
"entry_id": existing_entry.entry_id,
|
||||
},
|
||||
data=existing_entry.data,
|
||||
)
|
||||
assert result["type"] is FlowResultType.EXTERNAL_STEP
|
||||
authorize_url = result["url"]
|
||||
state = parse_qs(urlparse(authorize_url).query)["state"][0]
|
||||
|
||||
client = await hass_client_no_auth()
|
||||
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
|
||||
assert resp.status == HTTPStatus.OK
|
||||
|
||||
aioclient_mock.post(
|
||||
TOKEN_URL,
|
||||
json={
|
||||
"access_token": "new-access-token",
|
||||
"refresh_token": "new-refresh-token",
|
||||
"token_type": "bearer",
|
||||
"expires_in": 3600,
|
||||
},
|
||||
)
|
||||
|
||||
data_api_client = MagicMock()
|
||||
data_api_client.get_userinfo = AsyncMock(return_value={"email": "old@example.com"})
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.tibber.config_flow.TibberDataAPI",
|
||||
return_value=data_api_client,
|
||||
create=True,
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "reauth_successful"
|
||||
updated_entry = hass.config_entries.async_get_entry(existing_entry.entry_id)
|
||||
assert updated_entry is not None
|
||||
assert updated_entry.data[CONF_TOKEN]["access_token"] == "new-access-token"
|
||||
assert updated_entry.data["auth_implementation"] == DOMAIN
|
||||
assert updated_entry.data[CONF_API_TYPE] == API_TYPE_DATA_API
|
||||
assert updated_entry.title == "old@example.com"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_credentials", "current_request_with_host")
|
||||
async def test_data_api_reauth_wrong_account_abort(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
hass_client_no_auth: ClientSessionGenerator,
|
||||
aioclient_mock: AiohttpClientMocker,
|
||||
) -> None:
|
||||
"""Abort Data API reauth when a different account is returned."""
|
||||
existing_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={
|
||||
CONF_API_TYPE: API_TYPE_DATA_API,
|
||||
"auth_implementation": DOMAIN,
|
||||
CONF_TOKEN: {
|
||||
"access_token": "old-access-token",
|
||||
"refresh_token": "old-refresh-token",
|
||||
},
|
||||
},
|
||||
unique_id="old@example.com",
|
||||
title="old@example.com",
|
||||
)
|
||||
existing_entry.add_to_hass(hass)
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={
|
||||
"source": config_entries.SOURCE_REAUTH,
|
||||
"entry_id": existing_entry.entry_id,
|
||||
},
|
||||
data=existing_entry.data,
|
||||
)
|
||||
|
||||
authorize_url = result["url"]
|
||||
state = parse_qs(urlparse(authorize_url).query)["state"][0]
|
||||
|
||||
client = await hass_client_no_auth()
|
||||
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
|
||||
assert resp.status == HTTPStatus.OK
|
||||
|
||||
aioclient_mock.post(
|
||||
TOKEN_URL,
|
||||
json={
|
||||
"access_token": "new-access-token",
|
||||
"refresh_token": "new-refresh-token",
|
||||
"token_type": "bearer",
|
||||
"expires_in": 3600,
|
||||
},
|
||||
)
|
||||
|
||||
data_api_client = MagicMock()
|
||||
data_api_client.get_userinfo = AsyncMock(
|
||||
return_value={"email": "other@example.com"}
|
||||
)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.tibber.config_flow.TibberDataAPI",
|
||||
return_value=data_api_client,
|
||||
create=True,
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "wrong_account"
|
||||
assert result["description_placeholders"] == {"email": "old@example.com"}
|
||||
|
||||
135
tests/components/tibber/test_data_api.py
Normal file
135
tests/components/tibber/test_data_api.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""Tests for the Tibber Data API coordinator and sensors."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import aiohttp
|
||||
import pytest
|
||||
import tibber
|
||||
|
||||
from homeassistant.components.tibber.const import (
|
||||
API_TYPE_DATA_API,
|
||||
CONF_API_TYPE,
|
||||
DOMAIN,
|
||||
)
|
||||
from homeassistant.components.tibber.coordinator import TibberDataAPICoordinator
|
||||
from homeassistant.components.tibber.sensor import (
|
||||
TibberDataAPISensor,
|
||||
_async_setup_data_api_sensors,
|
||||
)
|
||||
from homeassistant.config_entries import ConfigEntryState
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
|
||||
from homeassistant.helpers.update_coordinator import UpdateFailed
|
||||
|
||||
from .conftest import create_tibber_device
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def data_api_entry(hass: HomeAssistant) -> MockConfigEntry:
|
||||
"""Create a Data API Tibber config entry."""
|
||||
entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_ACCESS_TOKEN: "token", CONF_API_TYPE: API_TYPE_DATA_API},
|
||||
unique_id="data-api",
|
||||
)
|
||||
entry.add_to_hass(hass)
|
||||
return entry
|
||||
|
||||
|
||||
async def test_data_api_setup_adds_entities(
|
||||
hass: HomeAssistant,
|
||||
data_api_entry: MockConfigEntry,
|
||||
) -> None:
|
||||
"""Ensure Data API sensors are created and coordinator refreshes data."""
|
||||
runtime = MagicMock()
|
||||
client = MagicMock()
|
||||
client.get_all_devices = AsyncMock(
|
||||
return_value={"device-id": create_tibber_device(value=72.0)}
|
||||
)
|
||||
client.update_devices = AsyncMock(
|
||||
return_value={"device-id": create_tibber_device(value=83.0)}
|
||||
)
|
||||
runtime.async_get_client = AsyncMock(return_value=client)
|
||||
|
||||
hass.data.setdefault(DOMAIN, {})[API_TYPE_DATA_API] = runtime
|
||||
|
||||
added_entities: list[TibberDataAPISensor] = []
|
||||
|
||||
def async_add_entities(entities: list[TibberDataAPISensor]) -> None:
|
||||
added_entities.extend(entities)
|
||||
|
||||
data_api_entry.mock_state(hass, ConfigEntryState.SETUP_IN_PROGRESS)
|
||||
|
||||
await _async_setup_data_api_sensors(hass, data_api_entry, async_add_entities)
|
||||
|
||||
assert runtime.async_get_client.await_count == 2
|
||||
client.get_all_devices.assert_awaited_once()
|
||||
client.update_devices.assert_awaited_once()
|
||||
|
||||
assert len(added_entities) == 1
|
||||
sensor = added_entities[0]
|
||||
assert sensor.entity_description.key == "storage.stateOfCharge"
|
||||
assert sensor.native_value == 83.0
|
||||
assert sensor.available
|
||||
|
||||
sensor.coordinator.data = {}
|
||||
assert sensor.native_value is None
|
||||
assert not sensor.available
|
||||
|
||||
|
||||
async def test_data_api_coordinator_first_refresh_failure(
|
||||
hass: HomeAssistant, data_api_entry: MockConfigEntry
|
||||
) -> None:
|
||||
"""Ensure network failures during setup raise ConfigEntryNotReady."""
|
||||
runtime = MagicMock()
|
||||
runtime.async_get_client = AsyncMock(side_effect=aiohttp.ClientError("boom"))
|
||||
hass.data.setdefault(DOMAIN, {})[API_TYPE_DATA_API] = runtime
|
||||
|
||||
coordinator = TibberDataAPICoordinator(hass, data_api_entry, runtime)
|
||||
data_api_entry.mock_state(hass, ConfigEntryState.SETUP_IN_PROGRESS)
|
||||
|
||||
with pytest.raises(ConfigEntryNotReady):
|
||||
await coordinator.async_config_entry_first_refresh()
|
||||
assert isinstance(coordinator.last_exception, UpdateFailed)
|
||||
|
||||
|
||||
async def test_data_api_coordinator_first_refresh_auth_failed(
|
||||
hass: HomeAssistant, data_api_entry: MockConfigEntry
|
||||
) -> None:
|
||||
"""Ensure auth failures during setup propagate."""
|
||||
runtime = MagicMock()
|
||||
runtime.async_get_client = AsyncMock(side_effect=ConfigEntryAuthFailed("invalid"))
|
||||
hass.data.setdefault(DOMAIN, {})[API_TYPE_DATA_API] = runtime
|
||||
|
||||
coordinator = TibberDataAPICoordinator(hass, data_api_entry, runtime)
|
||||
data_api_entry.mock_state(hass, ConfigEntryState.SETUP_IN_PROGRESS)
|
||||
|
||||
with pytest.raises(ConfigEntryAuthFailed):
|
||||
await coordinator.async_config_entry_first_refresh()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"exception",
|
||||
[
|
||||
aiohttp.ClientError("err"),
|
||||
TimeoutError(),
|
||||
tibber.UserAgentMissingError("err"),
|
||||
],
|
||||
)
|
||||
async def test_data_api_coordinator_update_failures(
|
||||
hass: HomeAssistant, data_api_entry: MockConfigEntry, exception: Exception
|
||||
) -> None:
|
||||
"""Ensure update failures are wrapped in UpdateFailed."""
|
||||
runtime = MagicMock()
|
||||
runtime.async_get_client = AsyncMock(side_effect=exception)
|
||||
hass.data.setdefault(DOMAIN, {})[API_TYPE_DATA_API] = runtime
|
||||
|
||||
coordinator = TibberDataAPICoordinator(hass, data_api_entry, runtime)
|
||||
|
||||
with pytest.raises(UpdateFailed):
|
||||
await coordinator._async_update_data()
|
||||
@@ -1,13 +1,30 @@
|
||||
"""Test the Netatmo diagnostics."""
|
||||
"""Test the Tibber diagnostics."""
|
||||
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import aiohttp
|
||||
import pytest
|
||||
import tibber
|
||||
|
||||
from homeassistant.components.recorder import Recorder
|
||||
from homeassistant.components.tibber import TibberDataAPIRuntimeData
|
||||
from homeassistant.components.tibber.const import (
|
||||
API_TYPE_DATA_API,
|
||||
API_TYPE_GRAPHQL,
|
||||
CONF_API_TYPE,
|
||||
DOMAIN,
|
||||
)
|
||||
from homeassistant.components.tibber.diagnostics import (
|
||||
async_get_config_entry_diagnostics,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from .conftest import create_tibber_device
|
||||
from .test_common import mock_get_homes
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
from tests.components.diagnostics import get_diagnostics_for_config_entry
|
||||
from tests.typing import ClientSessionGenerator
|
||||
|
||||
@@ -34,6 +51,7 @@ async def test_entry_diagnostics(
|
||||
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
|
||||
|
||||
assert result == {
|
||||
"api_type": API_TYPE_GRAPHQL,
|
||||
"homes": [],
|
||||
}
|
||||
|
||||
@@ -44,6 +62,7 @@ async def test_entry_diagnostics(
|
||||
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
|
||||
|
||||
assert result == {
|
||||
"api_type": API_TYPE_GRAPHQL,
|
||||
"homes": [
|
||||
{
|
||||
"last_data_timestamp": "2016-01-01T12:48:57",
|
||||
@@ -54,3 +73,136 @@ async def test_entry_diagnostics(
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
async def test_data_api_diagnostics_no_runtime(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
) -> None:
|
||||
"""Test Data API diagnostics when runtime is not available."""
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_API_TYPE: API_TYPE_DATA_API},
|
||||
unique_id="data-api",
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
hass.data.setdefault(DOMAIN, {})
|
||||
|
||||
result = await async_get_config_entry_diagnostics(hass, config_entry)
|
||||
|
||||
assert result == {
|
||||
"api_type": API_TYPE_DATA_API,
|
||||
"devices": [],
|
||||
}
|
||||
|
||||
|
||||
async def test_data_api_diagnostics_success(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
) -> None:
|
||||
"""Test Data API diagnostics with successful device retrieval."""
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_API_TYPE: API_TYPE_DATA_API},
|
||||
unique_id="data-api",
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
session = MagicMock()
|
||||
session.async_ensure_token_valid = AsyncMock()
|
||||
session.token = "test-token"
|
||||
|
||||
client = MagicMock()
|
||||
client.get_all_devices = AsyncMock(
|
||||
return_value={
|
||||
"device-1": create_tibber_device(
|
||||
device_id="device-1",
|
||||
name="Device 1",
|
||||
brand="Tibber",
|
||||
model="Test Model",
|
||||
),
|
||||
"device-2": create_tibber_device(
|
||||
device_id="device-2",
|
||||
name="Device 2",
|
||||
brand="Tibber",
|
||||
model="Test Model",
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
runtime = TibberDataAPIRuntimeData(session=session)
|
||||
with patch.object(
|
||||
TibberDataAPIRuntimeData,
|
||||
"async_get_client",
|
||||
new_callable=AsyncMock,
|
||||
return_value=client,
|
||||
):
|
||||
hass.data.setdefault(DOMAIN, {})[API_TYPE_DATA_API] = runtime
|
||||
|
||||
result = await async_get_config_entry_diagnostics(hass, config_entry)
|
||||
|
||||
assert result["api_type"] == API_TYPE_DATA_API
|
||||
assert result["error"] is None
|
||||
assert len(result["devices"]) == 2
|
||||
device_ids = {device["id"] for device in result["devices"]}
|
||||
assert device_ids == {"device-1", "device-2"}
|
||||
for device in result["devices"]:
|
||||
assert device["id"] in ("device-1", "device-2")
|
||||
assert device["brand"] == "Tibber"
|
||||
assert device["model"] == "Test Model"
|
||||
if device["id"] == "device-1":
|
||||
assert device["name"] == "Device 1"
|
||||
else:
|
||||
assert device["name"] == "Device 2"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("exception", "expected_error"),
|
||||
[
|
||||
(ConfigEntryAuthFailed("Auth failed"), "Authentication failed"),
|
||||
(TimeoutError(), "Timeout error"),
|
||||
(aiohttp.ClientError("Connection error"), "Client error"),
|
||||
(tibber.InvalidLoginError(401), "Invalid login"),
|
||||
(tibber.RetryableHttpExceptionError(503), "Retryable HTTP error (503)"),
|
||||
(tibber.FatalHttpExceptionError(404), "Fatal HTTP error (404)"),
|
||||
],
|
||||
)
|
||||
async def test_data_api_diagnostics_exceptions(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
exception: Exception,
|
||||
expected_error: str,
|
||||
) -> None:
|
||||
"""Test Data API diagnostics with various exception scenarios."""
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_API_TYPE: API_TYPE_DATA_API},
|
||||
unique_id="data-api",
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
session = MagicMock()
|
||||
session.async_ensure_token_valid = AsyncMock()
|
||||
session.token = "test-token"
|
||||
|
||||
client = MagicMock()
|
||||
client.get_all_devices = AsyncMock(side_effect=exception)
|
||||
|
||||
runtime = TibberDataAPIRuntimeData(session=session)
|
||||
with patch.object(
|
||||
TibberDataAPIRuntimeData,
|
||||
"async_get_client",
|
||||
new_callable=AsyncMock,
|
||||
return_value=client,
|
||||
):
|
||||
hass.data.setdefault(DOMAIN, {})[API_TYPE_DATA_API] = runtime
|
||||
|
||||
result = await async_get_config_entry_diagnostics(hass, config_entry)
|
||||
|
||||
assert result["api_type"] == API_TYPE_DATA_API
|
||||
assert result["error"] == expected_error
|
||||
assert result["devices"] == []
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
"""Test loading of the Tibber config entry."""
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import ANY, AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.recorder import Recorder
|
||||
from homeassistant.components.tibber import DOMAIN
|
||||
from homeassistant.components.tibber import DOMAIN, TibberDataAPIRuntimeData
|
||||
from homeassistant.config_entries import ConfigEntryState
|
||||
from homeassistant.const import CONF_ACCESS_TOKEN
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
|
||||
|
||||
async def test_entry_unload(
|
||||
@@ -19,3 +23,55 @@ async def test_entry_unload(
|
||||
mock_tibber_setup.rt_disconnect.assert_called_once()
|
||||
await hass.async_block_till_done(wait_background_tasks=True)
|
||||
assert entry.state == ConfigEntryState.NOT_LOADED
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("recorder_mock")
|
||||
async def test_data_api_runtime_creates_client(
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
"""Ensure the data API runtime creates and caches the client."""
|
||||
session = MagicMock()
|
||||
session.async_ensure_token_valid = AsyncMock() # type: ignore[assignment]
|
||||
session.token = {CONF_ACCESS_TOKEN: "access-token"}
|
||||
|
||||
runtime = TibberDataAPIRuntimeData(session=session)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.tibber.__init__.tibber_data_api.TibberDataAPI"
|
||||
) as mock_client_cls:
|
||||
mock_client = MagicMock()
|
||||
mock_client.set_access_token = MagicMock()
|
||||
mock_client_cls.return_value = mock_client
|
||||
|
||||
client = await runtime.async_get_client(hass)
|
||||
|
||||
mock_client_cls.assert_called_once_with("access-token", websession=ANY)
|
||||
session.async_ensure_token_valid.assert_awaited_once()
|
||||
mock_client.set_access_token.assert_called_once_with("access-token")
|
||||
assert client is mock_client
|
||||
|
||||
mock_client.set_access_token.reset_mock()
|
||||
session.async_ensure_token_valid.reset_mock()
|
||||
|
||||
cached_client = await runtime.async_get_client(hass)
|
||||
|
||||
mock_client_cls.assert_called_once()
|
||||
session.async_ensure_token_valid.assert_awaited_once()
|
||||
mock_client.set_access_token.assert_called_once_with("access-token")
|
||||
assert cached_client is client
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("recorder_mock")
|
||||
async def test_data_api_runtime_missing_token_raises(
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
"""Ensure missing tokens trigger reauthentication."""
|
||||
session = MagicMock()
|
||||
session.async_ensure_token_valid = AsyncMock() # type: ignore[assignment]
|
||||
session.token = {}
|
||||
|
||||
runtime = TibberDataAPIRuntimeData(session=session)
|
||||
|
||||
with pytest.raises(ConfigEntryAuthFailed):
|
||||
await runtime.async_get_client(hass)
|
||||
session.async_ensure_token_valid.assert_awaited_once()
|
||||
|
||||
Reference in New Issue
Block a user