Compare commits

..

4 Commits

Author SHA1 Message Date
Franck Nijhof
8f2b1f0eff Bump version to 2026.1.0b0 2025-12-29 19:01:17 +00:00
Joost Lekkerkerker
a1a1d65ee4 Add Hood fan speed select entity to SmartThings (#157841)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-12-29 19:56:55 +01:00
Louis Christ
8778d4c704 Move actions to async_setup in bluesound (#159809) 2025-12-29 19:44:05 +01:00
Jeremiah Paige
7790a2ebdd Add config flow to wsdot (#149208)
Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-12-29 18:58:09 +01:00
44 changed files with 1465 additions and 1284 deletions

View File

@@ -2,16 +2,25 @@
from pyblu import Player
from pyblu.errors import PlayerUnreachableError
import voluptuous as vol
from homeassistant.components.media_player import DOMAIN as MEDIA_PLAYER_DOMAIN
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_HOST, CONF_PORT, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers import config_validation as cv, service
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN
from .const import (
ATTR_MASTER,
DOMAIN,
SERVICE_CLEAR_TIMER,
SERVICE_JOIN,
SERVICE_SET_TIMER,
SERVICE_UNJOIN,
)
from .coordinator import (
BluesoundConfigEntry,
BluesoundCoordinator,
@@ -28,6 +37,38 @@ PLATFORMS = [
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Bluesound."""
service.async_register_platform_entity_service(
hass,
DOMAIN,
SERVICE_SET_TIMER,
entity_domain=MEDIA_PLAYER_DOMAIN,
schema=None,
func="async_increase_timer",
)
service.async_register_platform_entity_service(
hass,
DOMAIN,
SERVICE_CLEAR_TIMER,
entity_domain=MEDIA_PLAYER_DOMAIN,
schema=None,
func="async_clear_timer",
)
service.async_register_platform_entity_service(
hass,
DOMAIN,
SERVICE_JOIN,
entity_domain=MEDIA_PLAYER_DOMAIN,
schema={vol.Required(ATTR_MASTER): cv.entity_id},
func="async_bluesound_join",
)
service.async_register_platform_entity_service(
hass,
DOMAIN,
SERVICE_UNJOIN,
entity_domain=MEDIA_PLAYER_DOMAIN,
schema=None,
func="async_bluesound_unjoin",
)
return True

View File

@@ -4,3 +4,8 @@ DOMAIN = "bluesound"
INTEGRATION_TITLE = "Bluesound"
ATTR_BLUESOUND_GROUP = "bluesound_group"
ATTR_MASTER = "master"
SERVICE_CLEAR_TIMER = "clear_sleep_timer"
SERVICE_JOIN = "join"
SERVICE_SET_TIMER = "set_sleep_timer"
SERVICE_UNJOIN = "unjoin"

View File

@@ -8,7 +8,6 @@ import logging
from typing import TYPE_CHECKING, Any
from pyblu import Input, Player, Preset, Status, SyncStatus
import voluptuous as vol
from homeassistant.components import media_source
from homeassistant.components.media_player import (
@@ -22,12 +21,7 @@ from homeassistant.components.media_player import (
from homeassistant.const import CONF_HOST, CONF_PORT
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers import (
config_validation as cv,
entity_platform,
entity_registry as er,
issue_registry as ir,
)
from homeassistant.helpers import entity_registry as er, issue_registry as ir
from homeassistant.helpers.device_registry import (
CONNECTION_NETWORK_MAC,
DeviceInfo,
@@ -41,7 +35,15 @@ from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import dt as dt_util, slugify
from .const import ATTR_BLUESOUND_GROUP, ATTR_MASTER, DOMAIN
from .const import (
ATTR_BLUESOUND_GROUP,
ATTR_MASTER,
DOMAIN,
SERVICE_CLEAR_TIMER,
SERVICE_JOIN,
SERVICE_SET_TIMER,
SERVICE_UNJOIN,
)
from .coordinator import BluesoundCoordinator
from .utils import (
dispatcher_join_signal,
@@ -60,11 +62,6 @@ SCAN_INTERVAL = timedelta(minutes=15)
DATA_BLUESOUND = DOMAIN
DEFAULT_PORT = 11000
SERVICE_CLEAR_TIMER = "clear_sleep_timer"
SERVICE_JOIN = "join"
SERVICE_SET_TIMER = "set_sleep_timer"
SERVICE_UNJOIN = "unjoin"
POLL_TIMEOUT = 120
@@ -81,20 +78,6 @@ async def async_setup_entry(
config_entry.runtime_data.player,
)
platform = entity_platform.async_get_current_platform()
platform.async_register_entity_service(
SERVICE_SET_TIMER, None, "async_increase_timer"
)
platform.async_register_entity_service(
SERVICE_CLEAR_TIMER, None, "async_clear_timer"
)
platform.async_register_entity_service(
SERVICE_JOIN, {vol.Required(ATTR_MASTER): cv.entity_id}, "async_bluesound_join"
)
platform.async_register_entity_service(
SERVICE_UNJOIN, None, "async_bluesound_unjoin"
)
async_add_entities([bluesound_player], update_before_add=True)

View File

@@ -11,6 +11,8 @@ from homeassistant.components.fan import FanEntity, FanEntityFeature
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.util.percentage import (
ordered_list_item_to_percentage,
percentage_to_ordered_list_item,
percentage_to_ranged_value,
ranged_value_to_percentage,
)
@@ -22,6 +24,9 @@ from .entity import SmartThingsEntity
SPEED_RANGE = (1, 3) # off is not included
SMART = 14
PRESET_SMART = "smart"
async def async_setup_entry(
hass: HomeAssistant,
@@ -30,7 +35,7 @@ async def async_setup_entry(
) -> None:
"""Add fans for a config entry."""
entry_data = entry.runtime_data
async_add_entities(
entities: list[FanEntity] = [
SmartThingsFan(entry_data.client, device)
for device in entry_data.devices.values()
if Capability.SWITCH in device.status[MAIN]
@@ -42,7 +47,20 @@ async def async_setup_entry(
)
)
and Capability.THERMOSTAT_COOLING_SETPOINT not in device.status[MAIN]
]
entities.extend(
SmartThingsHood(entry_data.client, device)
for device in entry_data.devices.values()
if Capability.SWITCH in device.status[MAIN]
and Capability.SAMSUNG_CE_HOOD_FAN_SPEED in device.status[MAIN]
and (
device.status[MAIN][Capability.SAMSUNG_CE_HOOD_FAN_SPEED][
Attribute.SETTABLE_MIN_FAN_SPEED
].value
== SMART
)
)
async_add_entities(entities)
class SmartThingsFan(SmartThingsEntity, FanEntity):
@@ -149,3 +167,103 @@ class SmartThingsFan(SmartThingsEntity, FanEntity):
return self.get_attribute_value(
Capability.AIR_CONDITIONER_FAN_MODE, Attribute.SUPPORTED_AC_FAN_MODES
)
class SmartThingsHood(SmartThingsEntity, FanEntity):
"""Define a SmartThings Hood."""
_attr_name = None
_attr_supported_features = (
FanEntityFeature.TURN_ON
| FanEntityFeature.TURN_OFF
| FanEntityFeature.PRESET_MODE
| FanEntityFeature.SET_SPEED
)
_attr_preset_modes = [PRESET_SMART]
_attr_translation_key = "hood"
def __init__(self, client: SmartThings, device: FullDevice) -> None:
"""Init the class."""
super().__init__(
client,
device,
{
Capability.SWITCH,
Capability.SAMSUNG_CE_HOOD_FAN_SPEED,
},
)
@property
def fan_speeds(self) -> list[int]:
"""Return a list of available fan speeds."""
return [
speed
for speed in self.get_attribute_value(
Capability.SAMSUNG_CE_HOOD_FAN_SPEED, Attribute.SUPPORTED_HOOD_FAN_SPEED
)
if speed != SMART
]
async def async_set_preset_mode(self, preset_mode: str) -> None:
"""Set the preset_mode of the fan."""
await self.execute_device_command(
Capability.SAMSUNG_CE_HOOD_FAN_SPEED,
Command.SET_HOOD_FAN_SPEED,
argument=SMART,
)
async def async_set_percentage(self, percentage: int) -> None:
"""Set the speed percentage of the fan."""
if percentage == 0:
await self.execute_device_command(Capability.SWITCH, Command.OFF)
else:
await self.execute_device_command(
Capability.SAMSUNG_CE_HOOD_FAN_SPEED,
Command.SET_HOOD_FAN_SPEED,
argument=percentage_to_ordered_list_item(self.fan_speeds, percentage),
)
async def async_turn_on(
self,
percentage: int | None = None,
preset_mode: str | None = None,
**kwargs: Any,
) -> None:
"""Turn the fan on."""
await self.execute_device_command(Capability.SWITCH, Command.ON)
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the fan off."""
await self.execute_device_command(Capability.SWITCH, Command.OFF)
@property
def is_on(self) -> bool:
"""Return true if fan is on."""
return self.get_attribute_value(Capability.SWITCH, Attribute.SWITCH) == "on"
@property
def preset_mode(self) -> str | None:
"""Return the current preset mode."""
if (
self.get_attribute_value(
Capability.SAMSUNG_CE_HOOD_FAN_SPEED, Attribute.HOOD_FAN_SPEED
)
== SMART
):
return PRESET_SMART
return None
@property
def percentage(self) -> int | None:
"""Return the current speed percentage."""
fan_speed = self.get_attribute_value(
Capability.SAMSUNG_CE_HOOD_FAN_SPEED, Attribute.HOOD_FAN_SPEED
)
if fan_speed == SMART:
return None
return ordered_list_item_to_percentage(self.fan_speeds, fan_speed)
@property
def speed_count(self) -> int:
"""Return the number of available speeds."""
return len(self.fan_speeds)

View File

@@ -53,6 +53,17 @@
}
}
},
"fan": {
"hood": {
"state_attributes": {
"preset_mode": {
"state": {
"smart": "mdi:brain"
}
}
}
}
},
"number": {
"freezer_temperature": {
"default": "mdi:snowflake-thermometer"

View File

@@ -138,6 +138,17 @@
}
}
},
"fan": {
"hood": {
"state_attributes": {
"preset_mode": {
"state": {
"smart": "Smart"
}
}
}
}
},
"number": {
"cool_select_plus_temperature": {
"name": "CoolSelect+ temperature"

View File

@@ -1,36 +1,20 @@
"""Support for Tibber."""
from __future__ import annotations
from dataclasses import dataclass, field
import logging
import aiohttp
from aiohttp.client_exceptions import ClientError, ClientResponseError
import tibber
from tibber import data_api as tibber_data_api
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_ACCESS_TOKEN, EVENT_HOMEASSISTANT_STOP, Platform
from homeassistant.core import Event, HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.config_entry_oauth2_flow import (
ImplementationUnavailableError,
OAuth2Session,
async_get_config_entry_implementation,
)
from homeassistant.helpers.typing import ConfigType
from homeassistant.util import dt as dt_util, ssl as ssl_util
from .const import (
AUTH_IMPLEMENTATION,
CONF_LEGACY_ACCESS_TOKEN,
DATA_HASS_CONFIG,
DOMAIN,
TibberConfigEntry,
)
from .coordinator import TibberDataAPICoordinator
from .const import DATA_HASS_CONFIG, DOMAIN
from .services import async_setup_services
PLATFORMS = [Platform.NOTIFY, Platform.SENSOR]
@@ -40,33 +24,6 @@ CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
_LOGGER = logging.getLogger(__name__)
@dataclass
class TibberRuntimeData:
"""Runtime data for Tibber API entries."""
tibber_connection: tibber.Tibber
session: OAuth2Session
data_api_coordinator: TibberDataAPICoordinator | None = field(default=None)
_client: tibber_data_api.TibberDataAPI | None = None
async def async_get_client(
self, hass: HomeAssistant
) -> tibber_data_api.TibberDataAPI:
"""Return an authenticated Tibber Data API client."""
await self.session.async_ensure_token_valid()
token = self.session.token
access_token = token.get(CONF_ACCESS_TOKEN)
if not access_token:
raise ConfigEntryAuthFailed("Access token missing from OAuth session")
if self._client is None:
self._client = tibber_data_api.TibberDataAPI(
access_token,
websession=async_get_clientsession(hass),
)
self._client.set_access_token(access_token)
return self._client
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Tibber component."""
@@ -77,23 +34,16 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
return True
async def async_setup_entry(hass: HomeAssistant, entry: TibberConfigEntry) -> bool:
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up a config entry."""
# Added in 2026.1 to migrate existing users to OAuth2 (Tibber Data API).
# Can be removed after 2026.7
if AUTH_IMPLEMENTATION not in entry.data:
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN,
translation_key="data_api_reauth_required",
)
tibber_connection = tibber.Tibber(
access_token=entry.data[CONF_LEGACY_ACCESS_TOKEN],
access_token=entry.data[CONF_ACCESS_TOKEN],
websession=async_get_clientsession(hass),
time_zone=dt_util.get_default_time_zone(),
ssl=ssl_util.get_default_context(),
)
hass.data[DOMAIN] = tibber_connection
async def _close(event: Event) -> None:
await tibber_connection.rt_disconnect()
@@ -102,6 +52,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: TibberConfigEntry) -> bo
try:
await tibber_connection.update_info()
except (
TimeoutError,
aiohttp.ClientError,
@@ -114,45 +65,17 @@ async def async_setup_entry(hass: HomeAssistant, entry: TibberConfigEntry) -> bo
except tibber.FatalHttpExceptionError:
return False
try:
implementation = await async_get_config_entry_implementation(hass, entry)
except ImplementationUnavailableError as err:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="oauth2_implementation_unavailable",
) from err
session = OAuth2Session(hass, entry, implementation)
try:
await session.async_ensure_token_valid()
except ClientResponseError as err:
if 400 <= err.status < 500:
raise ConfigEntryAuthFailed(
"OAuth session is not valid, reauthentication required"
) from err
raise ConfigEntryNotReady from err
except ClientError as err:
raise ConfigEntryNotReady from err
entry.runtime_data = TibberRuntimeData(
tibber_connection=tibber_connection,
session=session,
)
coordinator = TibberDataAPICoordinator(hass, entry)
await coordinator.async_config_entry_first_refresh()
entry.runtime_data.data_api_coordinator = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(
hass: HomeAssistant, config_entry: TibberConfigEntry
) -> bool:
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Unload a config entry."""
if unload_ok := await hass.config_entries.async_unload_platforms(
unload_ok = await hass.config_entries.async_unload_platforms(
config_entry, PLATFORMS
):
await config_entry.runtime_data.tibber_connection.rt_disconnect()
)
if unload_ok:
tibber_connection = hass.data[DOMAIN]
await tibber_connection.rt_disconnect()
return unload_ok

View File

@@ -1,15 +0,0 @@
"""Application credentials platform for Tibber."""
from homeassistant.components.application_credentials import AuthorizationServer
from homeassistant.core import HomeAssistant
AUTHORIZE_URL = "https://thewall.tibber.com/connect/authorize"
TOKEN_URL = "https://thewall.tibber.com/connect/token"
async def async_get_authorization_server(hass: HomeAssistant) -> AuthorizationServer:
"""Return authorization server for Tibber Data API."""
return AuthorizationServer(
authorize_url=AUTHORIZE_URL,
token_url=TOKEN_URL,
)

View File

@@ -2,164 +2,80 @@
from __future__ import annotations
from collections.abc import Mapping
import logging
from typing import Any
import aiohttp
import tibber
from tibber import data_api as tibber_data_api
import voluptuous as vol
from homeassistant.config_entries import SOURCE_REAUTH, SOURCE_USER, ConfigFlowResult
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_TOKEN
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.config_entry_oauth2_flow import AbstractOAuth2FlowHandler
from .const import CONF_LEGACY_ACCESS_TOKEN, DATA_API_DEFAULT_SCOPES, DOMAIN
from .const import DOMAIN
DATA_SCHEMA = vol.Schema({vol.Required(CONF_LEGACY_ACCESS_TOKEN): str})
DATA_SCHEMA = vol.Schema({vol.Required(CONF_ACCESS_TOKEN): str})
ERR_TIMEOUT = "timeout"
ERR_CLIENT = "cannot_connect"
ERR_TOKEN = "invalid_access_token"
TOKEN_URL = "https://developer.tibber.com/settings/access-token"
_LOGGER = logging.getLogger(__name__)
class TibberConfigFlow(AbstractOAuth2FlowHandler, domain=DOMAIN):
class TibberConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Tibber integration."""
VERSION = 1
DOMAIN = DOMAIN
def __init__(self) -> None:
"""Initialize the config flow."""
super().__init__()
self._access_token: str | None = None
self._title = ""
@property
def logger(self) -> logging.Logger:
"""Return the logger."""
return _LOGGER
@property
def extra_authorize_data(self) -> dict[str, Any]:
"""Extra data appended to the authorize URL."""
return {
**super().extra_authorize_data,
"scope": " ".join(DATA_API_DEFAULT_SCOPES),
}
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
if user_input is None:
data_schema = self.add_suggested_values_to_schema(
DATA_SCHEMA, {CONF_LEGACY_ACCESS_TOKEN: self._access_token or ""}
self._async_abort_entries_match()
if user_input is not None:
access_token = user_input[CONF_ACCESS_TOKEN].replace(" ", "")
tibber_connection = tibber.Tibber(
access_token=access_token,
websession=async_get_clientsession(self.hass),
)
return self.async_show_form(
step_id=SOURCE_USER,
data_schema=data_schema,
description_placeholders={"url": TOKEN_URL},
errors={},
)
errors = {}
self._access_token = user_input[CONF_LEGACY_ACCESS_TOKEN].replace(" ", "")
tibber_connection = tibber.Tibber(
access_token=self._access_token,
websession=async_get_clientsession(self.hass),
)
self._title = tibber_connection.name or "Tibber"
try:
await tibber_connection.update_info()
except TimeoutError:
errors[CONF_ACCESS_TOKEN] = ERR_TIMEOUT
except tibber.InvalidLoginError:
errors[CONF_ACCESS_TOKEN] = ERR_TOKEN
except (
aiohttp.ClientError,
tibber.RetryableHttpExceptionError,
tibber.FatalHttpExceptionError,
):
errors[CONF_ACCESS_TOKEN] = ERR_CLIENT
errors: dict[str, str] = {}
try:
await tibber_connection.update_info()
except TimeoutError:
errors[CONF_LEGACY_ACCESS_TOKEN] = ERR_TIMEOUT
except tibber.InvalidLoginError:
errors[CONF_LEGACY_ACCESS_TOKEN] = ERR_TOKEN
except (
aiohttp.ClientError,
tibber.RetryableHttpExceptionError,
tibber.FatalHttpExceptionError,
):
errors[CONF_LEGACY_ACCESS_TOKEN] = ERR_CLIENT
if errors:
return self.async_show_form(
step_id="user",
data_schema=DATA_SCHEMA,
description_placeholders={"url": TOKEN_URL},
errors=errors,
)
if errors:
data_schema = self.add_suggested_values_to_schema(
DATA_SCHEMA, {CONF_LEGACY_ACCESS_TOKEN: self._access_token or ""}
)
return self.async_show_form(
step_id=SOURCE_USER,
data_schema=data_schema,
description_placeholders={"url": TOKEN_URL},
errors=errors,
)
await self.async_set_unique_id(tibber_connection.user_id)
if self.source == SOURCE_REAUTH:
reauth_entry = self._get_reauth_entry()
self._abort_if_unique_id_mismatch(
reason="wrong_account",
description_placeholders={"title": reauth_entry.title},
)
else:
unique_id = tibber_connection.user_id
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured()
return await self.async_step_pick_implementation()
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
) -> ConfigFlowResult:
"""Handle a reauth flow."""
reauth_entry = self._get_reauth_entry()
self._access_token = reauth_entry.data.get(CONF_LEGACY_ACCESS_TOKEN)
self._title = reauth_entry.title
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm reauthentication by reusing the user step."""
reauth_entry = self._get_reauth_entry()
self._access_token = reauth_entry.data.get(CONF_LEGACY_ACCESS_TOKEN)
self._title = reauth_entry.title
if user_input is None:
return self.async_show_form(
step_id="reauth_confirm",
return self.async_create_entry(
title=tibber_connection.name,
data={CONF_ACCESS_TOKEN: access_token},
)
return await self.async_step_user()
async def async_oauth_create_entry(self, data: dict) -> ConfigFlowResult:
"""Finalize the OAuth flow and create the config entry."""
if self._access_token is None:
return self.async_abort(reason="missing_configuration")
data[CONF_LEGACY_ACCESS_TOKEN] = self._access_token
access_token = data[CONF_TOKEN][CONF_ACCESS_TOKEN]
data_api_client = tibber_data_api.TibberDataAPI(
access_token,
websession=async_get_clientsession(self.hass),
return self.async_show_form(
step_id="user",
data_schema=DATA_SCHEMA,
description_placeholders={"url": TOKEN_URL},
errors={},
)
try:
await data_api_client.get_userinfo()
except (aiohttp.ClientError, TimeoutError):
return self.async_abort(reason="cannot_connect")
if self.source == SOURCE_REAUTH:
reauth_entry = self._get_reauth_entry()
return self.async_update_reload_and_abort(
reauth_entry,
data=data,
title=self._title,
)
return self.async_create_entry(title=self._title, data=data)

View File

@@ -1,34 +1,5 @@
"""Constants for Tibber integration."""
from __future__ import annotations
from typing import TYPE_CHECKING
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_ACCESS_TOKEN
if TYPE_CHECKING:
from . import TibberRuntimeData
type TibberConfigEntry = ConfigEntry[TibberRuntimeData]
CONF_LEGACY_ACCESS_TOKEN = CONF_ACCESS_TOKEN
AUTH_IMPLEMENTATION = "auth_implementation"
DATA_HASS_CONFIG = "tibber_hass_config"
DOMAIN = "tibber"
MANUFACTURER = "Tibber"
DATA_API_DEFAULT_SCOPES = [
"openid",
"profile",
"email",
"offline_access",
"data-api-user-read",
"data-api-chargers-read",
"data-api-energy-systems-read",
"data-api-homes-read",
"data-api-thermostats-read",
"data-api-vehicles-read",
"data-api-inverters-read",
]

View File

@@ -4,11 +4,9 @@ from __future__ import annotations
from datetime import timedelta
import logging
from typing import TYPE_CHECKING, cast
from typing import cast
from aiohttp.client_exceptions import ClientError
import tibber
from tibber.data_api import TibberDataAPI, TibberDevice
from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.models import (
@@ -21,18 +19,15 @@ from homeassistant.components.recorder.statistics import (
get_last_statistics,
statistics_during_period,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import UnitOfEnergy
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import dt as dt_util
from homeassistant.util.unit_conversion import EnergyConverter
from .const import DOMAIN
if TYPE_CHECKING:
from .const import TibberConfigEntry
FIVE_YEARS = 5 * 365 * 24
_LOGGER = logging.getLogger(__name__)
@@ -41,12 +36,12 @@ _LOGGER = logging.getLogger(__name__)
class TibberDataCoordinator(DataUpdateCoordinator[None]):
"""Handle Tibber data and insert statistics."""
config_entry: TibberConfigEntry
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: TibberConfigEntry,
config_entry: ConfigEntry,
tibber_connection: tibber.Tibber,
) -> None:
"""Initialize the data handler."""
@@ -192,64 +187,3 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
unit_of_measurement=unit,
)
async_add_external_statistics(self.hass, metadata, statistics)
class TibberDataAPICoordinator(DataUpdateCoordinator[dict[str, TibberDevice]]):
"""Fetch and cache Tibber Data API device capabilities."""
config_entry: TibberConfigEntry
def __init__(
self,
hass: HomeAssistant,
entry: TibberConfigEntry,
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
name=f"{DOMAIN} Data API",
update_interval=timedelta(minutes=1),
config_entry=entry,
)
self._runtime_data = entry.runtime_data
self.sensors_by_device: dict[str, dict[str, tibber.data_api.Sensor]] = {}
def _build_sensor_lookup(self, devices: dict[str, TibberDevice]) -> None:
"""Build sensor lookup dict for efficient access."""
self.sensors_by_device = {
device_id: {sensor.id: sensor for sensor in device.sensors}
for device_id, device in devices.items()
}
def get_sensor(
self, device_id: str, sensor_id: str
) -> tibber.data_api.Sensor | None:
"""Get a sensor by device and sensor ID."""
if device_sensors := self.sensors_by_device.get(device_id):
return device_sensors.get(sensor_id)
return None
async def _async_get_client(self) -> TibberDataAPI:
"""Get the Tibber Data API client with error handling."""
try:
return await self._runtime_data.async_get_client(self.hass)
except ConfigEntryAuthFailed:
raise
except (ClientError, TimeoutError, tibber.UserAgentMissingError) as err:
raise UpdateFailed(
f"Unable to create Tibber Data API client: {err}"
) from err
async def _async_setup(self) -> None:
"""Initial load of Tibber Data API devices."""
client = await self._async_get_client()
devices = await client.get_all_devices()
self._build_sensor_lookup(devices)
async def _async_update_data(self) -> dict[str, TibberDevice]:
"""Fetch the latest device capabilities from the Tibber Data API."""
client = await self._async_get_client()
devices: dict[str, TibberDevice] = await client.update_devices()
self._build_sensor_lookup(devices)
return devices

View File

@@ -4,18 +4,21 @@ from __future__ import annotations
from typing import Any
import tibber
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from .const import TibberConfigEntry
from .const import DOMAIN
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, config_entry: TibberConfigEntry
hass: HomeAssistant, config_entry: ConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
tibber_connection: tibber.Tibber = hass.data[DOMAIN]
runtime = config_entry.runtime_data
result: dict[str, Any] = {
return {
"homes": [
{
"last_data_timestamp": home.last_data_timestamp,
@@ -24,24 +27,6 @@ async def async_get_config_entry_diagnostics(
"last_cons_data_timestamp": home.last_cons_data_timestamp,
"country": home.country,
}
for home in runtime.tibber_connection.get_homes(only_active=False)
for home in tibber_connection.get_homes(only_active=False)
]
}
devices = (
runtime.data_api_coordinator.data
if runtime.data_api_coordinator is not None
else {}
) or {}
result["devices"] = [
{
"id": device.id,
"name": device.name,
"brand": device.brand,
"model": device.model,
}
for device in devices.values()
]
return result

View File

@@ -3,9 +3,9 @@
"name": "Tibber",
"codeowners": ["@danielhiversen"],
"config_flow": true,
"dependencies": ["application_credentials", "recorder"],
"dependencies": ["recorder"],
"documentation": "https://www.home-assistant.io/integrations/tibber",
"iot_class": "cloud_polling",
"loggers": ["tibber"],
"requirements": ["pyTibber==0.33.1"]
"requirements": ["pyTibber==0.32.2"]
}

View File

@@ -2,25 +2,28 @@
from __future__ import annotations
from tibber import Tibber
from homeassistant.components.notify import (
ATTR_TITLE_DEFAULT,
NotifyEntity,
NotifyEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import DOMAIN, TibberConfigEntry
from . import DOMAIN
async def async_setup_entry(
hass: HomeAssistant,
entry: TibberConfigEntry,
entry: ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Tibber notification entity."""
async_add_entities([TibberNotificationEntity(entry)])
async_add_entities([TibberNotificationEntity(entry.entry_id)])
class TibberNotificationEntity(NotifyEntity):
@@ -30,14 +33,13 @@ class TibberNotificationEntity(NotifyEntity):
_attr_name = DOMAIN
_attr_icon = "mdi:message-flash"
def __init__(self, entry: TibberConfigEntry) -> None:
def __init__(self, unique_id: str) -> None:
"""Initialize Tibber notify entity."""
self._attr_unique_id = entry.entry_id
self._entry = entry
self._attr_unique_id = unique_id
async def async_send_message(self, message: str, title: str | None = None) -> None:
"""Send a message to Tibber devices."""
tibber_connection = self._entry.runtime_data.tibber_connection
tibber_connection: Tibber = self.hass.data[DOMAIN]
try:
await tibber_connection.send_notification(
title or ATTR_TITLE_DEFAULT, message

View File

@@ -10,8 +10,7 @@ from random import randrange
from typing import Any
import aiohttp
from tibber import FatalHttpExceptionError, RetryableHttpExceptionError, TibberHome
from tibber.data_api import TibberDevice
import tibber
from homeassistant.components.sensor import (
SensorDeviceClass,
@@ -28,7 +27,6 @@ from homeassistant.const import (
UnitOfElectricCurrent,
UnitOfElectricPotential,
UnitOfEnergy,
UnitOfLength,
UnitOfPower,
)
from homeassistant.core import Event, HomeAssistant, callback
@@ -43,8 +41,8 @@ from homeassistant.helpers.update_coordinator import (
)
from homeassistant.util import Throttle, dt as dt_util
from .const import DOMAIN, MANUFACTURER, TibberConfigEntry
from .coordinator import TibberDataAPICoordinator, TibberDataCoordinator
from .const import DOMAIN, MANUFACTURER
from .coordinator import TibberDataCoordinator
_LOGGER = logging.getLogger(__name__)
@@ -262,65 +260,14 @@ SENSORS: tuple[SensorEntityDescription, ...] = (
)
DATA_API_SENSORS: tuple[SensorEntityDescription, ...] = (
SensorEntityDescription(
key="storage.stateOfCharge",
translation_key="storage_state_of_charge",
device_class=SensorDeviceClass.BATTERY,
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="storage.targetStateOfCharge",
translation_key="storage_target_state_of_charge",
device_class=SensorDeviceClass.BATTERY,
native_unit_of_measurement=PERCENTAGE,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="range.remaining",
translation_key="range_remaining",
device_class=SensorDeviceClass.DISTANCE,
native_unit_of_measurement=UnitOfLength.KILOMETERS,
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=1,
),
SensorEntityDescription(
key="charging.current.max",
translation_key="charging_current_max",
device_class=SensorDeviceClass.CURRENT,
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
state_class=SensorStateClass.MEASUREMENT,
),
SensorEntityDescription(
key="charging.current.offlineFallback",
translation_key="charging_current_offline_fallback",
device_class=SensorDeviceClass.CURRENT,
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
state_class=SensorStateClass.MEASUREMENT,
),
)
async def async_setup_entry(
hass: HomeAssistant,
entry: TibberConfigEntry,
entry: ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Tibber sensor."""
_setup_data_api_sensors(entry, async_add_entities)
await _async_setup_graphql_sensors(hass, entry, async_add_entities)
async def _async_setup_graphql_sensors(
hass: HomeAssistant,
entry: TibberConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Tibber sensor."""
tibber_connection = entry.runtime_data.tibber_connection
tibber_connection = hass.data[DOMAIN]
entity_registry = er.async_get(hass)
device_registry = dr.async_get(hass)
@@ -333,11 +280,7 @@ async def _async_setup_graphql_sensors(
except TimeoutError as err:
_LOGGER.error("Timeout connecting to Tibber home: %s ", err)
raise PlatformNotReady from err
except (
RetryableHttpExceptionError,
FatalHttpExceptionError,
aiohttp.ClientError,
) as err:
except (tibber.RetryableHttpExceptionError, aiohttp.ClientError) as err:
_LOGGER.error("Error connecting to Tibber home: %s ", err)
raise PlatformNotReady from err
@@ -382,67 +325,7 @@ async def _async_setup_graphql_sensors(
device_entry.id, new_identifiers={(DOMAIN, home.home_id)}
)
async_add_entities(entities)
def _setup_data_api_sensors(
entry: TibberConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up sensors backed by the Tibber Data API."""
coordinator = entry.runtime_data.data_api_coordinator
if coordinator is None:
return
entities: list[TibberDataAPISensor] = []
api_sensors = {sensor.key: sensor for sensor in DATA_API_SENSORS}
for device in coordinator.data.values():
for sensor in device.sensors:
description: SensorEntityDescription | None = api_sensors.get(sensor.id)
if description is None:
_LOGGER.debug(
"Sensor %s not found in DATA_API_SENSORS, skipping", sensor
)
continue
entities.append(TibberDataAPISensor(coordinator, device, description))
async_add_entities(entities)
class TibberDataAPISensor(CoordinatorEntity[TibberDataAPICoordinator], SensorEntity):
"""Representation of a Tibber Data API capability sensor."""
_attr_has_entity_name = True
def __init__(
self,
coordinator: TibberDataAPICoordinator,
device: TibberDevice,
entity_description: SensorEntityDescription,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator)
self._device_id: str = device.id
self.entity_description = entity_description
self._attr_translation_key = entity_description.translation_key
self._attr_unique_id = f"{device.external_id}_{self.entity_description.key}"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, device.external_id)},
name=device.name,
manufacturer=device.brand,
model=device.model,
)
@property
def native_value(self) -> StateType:
"""Return the value reported by the device."""
sensors = self.coordinator.sensors_by_device.get(self._device_id, {})
sensor = sensors.get(self.entity_description.key)
return sensor.value if sensor else None
async_add_entities(entities, True)
class TibberSensor(SensorEntity):
@@ -450,7 +333,9 @@ class TibberSensor(SensorEntity):
_attr_has_entity_name = True
def __init__(self, *args: Any, tibber_home: TibberHome, **kwargs: Any) -> None:
def __init__(
self, *args: Any, tibber_home: tibber.TibberHome, **kwargs: Any
) -> None:
"""Initialize the sensor."""
super().__init__(*args, **kwargs)
self._tibber_home = tibber_home
@@ -481,7 +366,7 @@ class TibberSensorElPrice(TibberSensor):
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_translation_key = "electricity_price"
def __init__(self, tibber_home: TibberHome) -> None:
def __init__(self, tibber_home: tibber.TibberHome) -> None:
"""Initialize the sensor."""
super().__init__(tibber_home=tibber_home)
self._last_updated: datetime.datetime | None = None
@@ -558,7 +443,7 @@ class TibberDataSensor(TibberSensor, CoordinatorEntity[TibberDataCoordinator]):
def __init__(
self,
tibber_home: TibberHome,
tibber_home: tibber.TibberHome,
coordinator: TibberDataCoordinator,
entity_description: SensorEntityDescription,
) -> None:
@@ -585,7 +470,7 @@ class TibberSensorRT(TibberSensor, CoordinatorEntity["TibberRtDataCoordinator"])
def __init__(
self,
tibber_home: TibberHome,
tibber_home: tibber.TibberHome,
description: SensorEntityDescription,
initial_state: float,
coordinator: TibberRtDataCoordinator,
@@ -647,7 +532,7 @@ class TibberRtEntityCreator:
def __init__(
self,
async_add_entities: AddConfigEntryEntitiesCallback,
tibber_home: TibberHome,
tibber_home: tibber.TibberHome,
entity_registry: er.EntityRegistry,
) -> None:
"""Initialize the data handler."""
@@ -733,7 +618,7 @@ class TibberRtDataCoordinator(DataUpdateCoordinator): # pylint: disable=hass-en
hass: HomeAssistant,
config_entry: ConfigEntry,
add_sensor_callback: Callable[[TibberRtDataCoordinator, Any], None],
tibber_home: TibberHome,
tibber_home: tibber.TibberHome,
) -> None:
"""Initialize the data handler."""
self._add_sensor_callback = add_sensor_callback

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
import datetime as dt
from datetime import datetime
from typing import TYPE_CHECKING, Any, Final
from typing import Any, Final
import voluptuous as vol
@@ -20,9 +20,6 @@ from homeassistant.util import dt as dt_util
from .const import DOMAIN
if TYPE_CHECKING:
from .const import TibberConfigEntry
PRICE_SERVICE_NAME = "get_prices"
ATTR_START: Final = "start"
ATTR_END: Final = "end"
@@ -36,13 +33,7 @@ SERVICE_SCHEMA: Final = vol.Schema(
async def __get_prices(call: ServiceCall) -> ServiceResponse:
entries: list[TibberConfigEntry] = call.hass.config_entries.async_entries(DOMAIN)
if not entries:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="no_config_entry",
)
tibber_connection = entries[0].runtime_data.tibber_connection
tibber_connection = call.hass.data[DOMAIN]
start = __get_date(call.data.get(ATTR_START), "start")
end = __get_date(call.data.get(ATTR_END), "end")
@@ -66,7 +57,7 @@ async def __get_prices(call: ServiceCall) -> ServiceResponse:
selected_data = [
price
for price in price_data
if start <= dt.datetime.fromisoformat(str(price["start_time"])) < end
if start <= dt.datetime.fromisoformat(price["start_time"]) < end
]
tibber_prices[home_nickname] = selected_data

View File

@@ -1,11 +1,7 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]",
"missing_configuration": "[%key:common::config_flow::abort::oauth2_missing_configuration%]",
"missing_credentials": "[%key:common::config_flow::abort::oauth2_missing_credentials%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]",
"wrong_account": "The connected account does not match {title}. Sign in with the same Tibber account and try again."
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
@@ -13,10 +9,6 @@
"timeout": "[%key:common::config_flow::error::timeout_connect%]"
},
"step": {
"reauth_confirm": {
"description": "Reconnect your Tibber account to refresh access.",
"title": "[%key:common::config_flow::title::reauth%]"
},
"user": {
"data": {
"access_token": "[%key:common::config_flow::data::access_token%]"
@@ -48,12 +40,6 @@
"average_power": {
"name": "Average power"
},
"charging_current_max": {
"name": "Maximum allowed charge current"
},
"charging_current_offline_fallback": {
"name": "Fallback current if charger goes offline"
},
"current_l1": {
"name": "Current L1"
},
@@ -102,18 +88,9 @@
"power_production": {
"name": "Power production"
},
"range_remaining": {
"name": "Estimated remaining driving range"
},
"signal_strength": {
"name": "Signal strength"
},
"storage_state_of_charge": {
"name": "State of charge"
},
"storage_target_state_of_charge": {
"name": "Target state of charge"
},
"voltage_phase1": {
"name": "Voltage phase1"
},
@@ -126,18 +103,9 @@
}
},
"exceptions": {
"data_api_reauth_required": {
"message": "Reconnect Tibber so Home Assistant can enable the new Tibber Data API features."
},
"invalid_date": {
"message": "Invalid datetime provided {date}"
},
"no_config_entry": {
"message": "No Tibber integration configured"
},
"oauth2_implementation_unavailable": {
"message": "[%key:common::exceptions::oauth2_implementation_unavailable::message%]"
},
"send_message_timeout": {
"message": "Timeout sending message with Tibber"
}

View File

@@ -1 +1,36 @@
"""The wsdot component."""
import wsdot as wsdot_api
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_API_KEY, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError
from homeassistant.helpers.aiohttp_client import async_get_clientsession
PLATFORMS = [Platform.SENSOR]
type WsdotConfigEntry = ConfigEntry[wsdot_api.WsdotTravelTimes]
async def async_setup_entry(hass: HomeAssistant, entry: WsdotConfigEntry) -> bool:
"""Set up wsdot as config entry."""
session = async_get_clientsession(hass)
api_key = entry.data[CONF_API_KEY]
wsdot_travel_times = wsdot_api.WsdotTravelTimes(api_key=api_key, session=session)
try:
# The only way to validate the provided API key is to request data
# we don't need the data here, only the non-exception
await wsdot_travel_times.get_all_travel_times()
except wsdot_api.WsdotTravelError as api_error:
raise ConfigEntryError("Bad auth") from api_error
entry.runtime_data = wsdot_travel_times
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -0,0 +1,157 @@
"""Adds config flow for wsdot."""
import logging
from types import MappingProxyType
from typing import Any
import voluptuous as vol
import wsdot as wsdot_api
from homeassistant.config_entries import (
SOURCE_USER,
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
ConfigSubentry,
ConfigSubentryFlow,
SubentryFlowResult,
)
from homeassistant.const import CONF_API_KEY, CONF_ID, CONF_NAME
from homeassistant.core import callback
from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig
from .const import CONF_TRAVEL_TIMES, DOMAIN, SUBENTRY_TRAVEL_TIMES
_LOGGER = logging.getLogger(__name__)
class WSDOTConfigFlow(ConfigFlow, domain=DOMAIN):
"""Config flow for WSDOT."""
VERSION = 1
MINOR_VERSION = 1
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initialized by the user."""
errors = {}
if user_input is not None:
data = {CONF_API_KEY: user_input[CONF_API_KEY]}
self._async_abort_entries_match(data)
wsdot_travel_times = wsdot_api.WsdotTravelTimes(user_input[CONF_API_KEY])
try:
await wsdot_travel_times.get_all_travel_times()
except wsdot_api.WsdotTravelError as ws_error:
if ws_error.status == 400:
errors[CONF_API_KEY] = "invalid_api_key"
else:
errors["base"] = "cannot_connect"
else:
return self.async_create_entry(
title=DOMAIN,
data=data,
)
return self.async_show_form(
step_id=SOURCE_USER,
data_schema=vol.Schema(
{
vol.Required(CONF_API_KEY): str,
}
),
errors=errors,
)
async def async_step_import(self, import_info: dict[str, Any]) -> ConfigFlowResult:
"""Handle a flow initialized by import."""
self._async_abort_entries_match({CONF_API_KEY: import_info[CONF_API_KEY]})
wsdot_travel_times = wsdot_api.WsdotTravelTimes(import_info[CONF_API_KEY])
try:
travel_time_routes = await wsdot_travel_times.get_all_travel_times()
except wsdot_api.WsdotTravelError as ws_error:
if ws_error.status == 400:
return self.async_abort(reason="invalid_api_key")
return self.async_abort(reason="cannot_connect")
subentries = []
for route in import_info[CONF_TRAVEL_TIMES]:
maybe_travel_time = [
tt
for tt in travel_time_routes
# old platform configs could store the id as either a str or an int
if str(tt.TravelTimeID) == str(route[CONF_ID])
]
if not maybe_travel_time:
return self.async_abort(reason="invalid_travel_time_id")
travel_time = maybe_travel_time[0]
route_name = travel_time.Name
unique_id = "_".join(travel_time.Name.split())
subentries.append(
ConfigSubentry(
subentry_type=SUBENTRY_TRAVEL_TIMES,
unique_id=unique_id,
title=route_name,
data=MappingProxyType(
{CONF_NAME: travel_time.Name, CONF_ID: travel_time.TravelTimeID}
),
).as_dict()
)
return self.async_create_entry(
title=DOMAIN,
data={
CONF_API_KEY: import_info[CONF_API_KEY],
},
subentries=subentries,
)
@classmethod
@callback
def async_get_supported_subentry_types(
cls, config_entry: ConfigEntry
) -> dict[str, type[ConfigSubentryFlow]]:
"""Return subentries supported by wsdot."""
return {SUBENTRY_TRAVEL_TIMES: TravelTimeSubentryFlowHandler}
class TravelTimeSubentryFlowHandler(ConfigSubentryFlow):
"""Handle subentry flow for adding WSDOT Travel Times."""
def __init__(self, *args, **kwargs) -> None:
"""Initialize TravelTimeSubentryFlowHandler."""
super().__init__(*args, **kwargs)
self.travel_times: dict[str, int] | None = None
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> SubentryFlowResult:
"""Add a new Travel Time subentry."""
if self.travel_times is None:
client = self._get_entry().runtime_data
travel_times = await client.get_all_travel_times()
self.travel_times = {tt.Name: tt.TravelTimeID for tt in travel_times}
if user_input is not None:
name = user_input[CONF_NAME]
tt_id = self.travel_times[name]
unique_id = str(tt_id)
data = {CONF_NAME: name, CONF_ID: tt_id}
for entry in self.hass.config_entries.async_entries(DOMAIN):
for subentry in entry.subentries.values():
if subentry.unique_id == unique_id:
return self.async_abort(reason="already_configured")
return self.async_create_entry(title=name, unique_id=unique_id, data=data)
names = SelectSelector(
SelectSelectorConfig(
options=list(self.travel_times.keys()),
sort=True,
)
)
return self.async_show_form(
step_id=SOURCE_USER,
data_schema=vol.Schema({vol.Required(CONF_NAME): names}),
errors={},
)

View File

@@ -0,0 +1,11 @@
"""Constants for wsdot component."""
ATTRIBUTION = "Data provided by WSDOT"
CONF_DATA = "data"
CONF_TITLE = "title"
CONF_TRAVEL_TIMES = "travel_time"
DOMAIN = "wsdot"
SUBENTRY_TRAVEL_TIMES = "travel_time"

View File

@@ -2,7 +2,9 @@
"domain": "wsdot",
"name": "Washington State Department of Transportation (WSDOT)",
"codeowners": [],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/wsdot",
"integration_type": "service",
"iot_class": "cloud_polling",
"loggers": ["wsdot"],
"quality_scale": "legacy",

View File

@@ -7,27 +7,29 @@ import logging
from typing import Any
import voluptuous as vol
from wsdot import TravelTime, WsdotTravelError, WsdotTravelTimes
import wsdot as wsdot_api
from homeassistant.components.sensor import (
PLATFORM_SCHEMA as SENSOR_PLATFORM_SCHEMA,
SensorEntity,
)
from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.const import CONF_API_KEY, CONF_ID, CONF_NAME, UnitOfTime
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers import config_validation as cv, issue_registry as ir
from homeassistant.helpers.entity_platform import (
AddConfigEntryEntitiesCallback,
AddEntitiesCallback,
)
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import WsdotConfigEntry
from .const import ATTRIBUTION, CONF_TRAVEL_TIMES, DOMAIN
_LOGGER = logging.getLogger(__name__)
ATTRIBUTION = "Data provided by WSDOT"
CONF_TRAVEL_TIMES = "travel_time"
ICON = "mdi:car"
DOMAIN = "wsdot"
SCAN_INTERVAL = timedelta(minutes=3)
@@ -44,22 +46,61 @@ PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the WSDOT sensor."""
sensors = []
session = async_get_clientsession(hass)
api_key = config[CONF_API_KEY]
wsdot_travel = WsdotTravelTimes(api_key=api_key, session=session)
for travel_time in config[CONF_TRAVEL_TIMES]:
name = travel_time.get(CONF_NAME) or travel_time.get(CONF_ID)
travel_time_id = int(travel_time[CONF_ID])
sensors.append(
WashingtonStateTravelTimeSensor(name, wsdot_travel, travel_time_id)
"""Migrate a platform-style wsdot to entry-style."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=config,
)
if (
result.get("type") is FlowResultType.ABORT
and result.get("reason") != "already_configured"
):
ir.async_create_issue(
hass,
DOMAIN,
f"deprecated_yaml_import_issue_{result.get('reason')}",
breaks_in_ha_version="2026.7.0",
is_fixable=False,
is_persistent=True,
issue_domain=DOMAIN,
severity=ir.IssueSeverity.WARNING,
translation_key=f"deprecated_yaml_import_issue_{result.get('reason')}",
translation_placeholders={"domain": DOMAIN, "integration_title": "WSDOT"},
)
return
ir.async_create_issue(
hass,
HOMEASSISTANT_DOMAIN,
"deprecated_yaml",
breaks_in_ha_version="2026.7.0",
is_fixable=False,
is_persistent=True,
issue_domain=DOMAIN,
severity=ir.IssueSeverity.WARNING,
translation_key="deprecated_yaml",
translation_placeholders={"domain": DOMAIN, "integration_title": "WSDOT"},
)
add_entities(sensors, True)
async def async_setup_entry(
hass: HomeAssistant,
entry: WsdotConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the WSDOT sensor."""
for subentry_id, subentry in entry.subentries.items():
name = subentry.data[CONF_NAME]
travel_time_id = subentry.data[CONF_ID]
sensor = WashingtonStateTravelTimeSensor(
name, entry.runtime_data, travel_time_id
)
async_add_entities(
[sensor], config_subentry_id=subentry_id, update_before_add=True
)
class WashingtonStateTransportSensor(SensorEntity):
@@ -70,6 +111,7 @@ class WashingtonStateTransportSensor(SensorEntity):
can read them and make them available.
"""
_attr_attribution = ATTRIBUTION
_attr_icon = ICON
def __init__(self, name: str) -> None:
@@ -91,23 +133,23 @@ class WashingtonStateTransportSensor(SensorEntity):
class WashingtonStateTravelTimeSensor(WashingtonStateTransportSensor):
"""Travel time sensor from WSDOT."""
_attr_attribution = ATTRIBUTION
_attr_native_unit_of_measurement = UnitOfTime.MINUTES
def __init__(
self, name: str, wsdot_travel: WsdotTravelTimes, travel_time_id: int
self, name: str, wsdot_travel: wsdot_api.WsdotTravelTimes, travel_time_id: int
) -> None:
"""Construct a travel time sensor."""
super().__init__(name)
self._data: TravelTime | None = None
self._data: wsdot_api.TravelTime | None = None
self._travel_time_id = travel_time_id
self._wsdot_travel = wsdot_travel
self._attr_unique_id = f"travel_time-{travel_time_id}"
async def async_update(self) -> None:
"""Get the latest data from WSDOT."""
try:
travel_time = await self._wsdot_travel.get_travel_time(self._travel_time_id)
except WsdotTravelError:
except wsdot_api.WsdotTravelError:
_LOGGER.warning("Invalid response from WSDOT API")
else:
self._data = travel_time

View File

@@ -0,0 +1,63 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_api_key": "[%key:common::config_flow::error::invalid_api_key%]",
"invalid_travel_time_id": "WSDOT TravelTimeId does not describe a valid travel time route {travel_time_id}",
"no_travel_time": "Travel time route created without a travel time entity"
},
"step": {
"user": {
"data": {
"api_key": "[%key:common::config_flow::data::api_key%]"
},
"data_description": {
"api_key": "Your WSDOT API key."
},
"description": "Set up your Washington State Department of Transportation integration.",
"title": "WSDOT setup"
}
}
},
"config_subentries": {
"travel_time": {
"entry_type": "Travel time",
"initiate_flow": {
"user": "Travel time"
},
"step": {
"user": {
"data": {
"name": "Travel Route Description"
},
"data_description": {
"name": "The official WSDOT description of the route."
},
"description": "Select one of the provided the highway routes to monitor estimated driving time along that route",
"title": "WSDOT travel time setup"
}
}
}
},
"issues": {
"deprecated_yaml_import_issue_cannot_connect": {
"description": "Configuring {domain} using YAML sensor platform is deprecated.\n\nWhile importing your configuration, Home Assistant could not connect to the {domain} API. Please check your internet connection and the status of the {domain} API, then restart Home Assistant to try again, or remove the existing YAML configuration and set the integration up via the UI.",
"title": "[%key:component::wsdot::issues::deprecated_yaml_import_issue_invalid_auth::title%]"
},
"deprecated_yaml_import_issue_invalid_auth": {
"description": "Configuring {domain} using YAML sensor platform is deprecated.\n\nWhile importing your configuration, an invalid API key was found. Please update your YAML configuration, or remove the existing YAML configuration and set the integration up via the UI.",
"title": "{domain} YAML configuration deprecated"
},
"deprecated_yaml_import_issue_invalid_travel_time_id": {
"description": "Configuring {domain} using YAML sensor platform is deprecated.\n\nWhile importing your configuration, an invalid travel_time_id was found. Please update your YAML configuration, or remove the existing YAML configuration and set the integration up via the UI.",
"title": "[%key:component::wsdot::issues::deprecated_yaml_import_issue_invalid_auth::title%]"
},
"deprecated_yaml_import_issue_unknown": {
"description": "Configuring {domain} using YAML sensor platform is deprecated.\n\nWhile importing your configuration, an unknown error occurred. Please restart Home Assistant to try again, or remove the existing YAML configuration and set the integration up via the UI.",
"title": "[%key:component::wsdot::issues::deprecated_yaml_import_issue_invalid_auth::title%]"
}
}
}

View File

@@ -17,7 +17,7 @@ if TYPE_CHECKING:
APPLICATION_NAME: Final = "HomeAssistant"
MAJOR_VERSION: Final = 2026
MINOR_VERSION: Final = 1
PATCH_VERSION: Final = "0.dev0"
PATCH_VERSION: Final = "0b0"
__short_version__: Final = f"{MAJOR_VERSION}.{MINOR_VERSION}"
__version__: Final = f"{__short_version__}.{PATCH_VERSION}"
REQUIRED_PYTHON_VER: Final[tuple[int, int, int]] = (3, 13, 2)

View File

@@ -39,7 +39,6 @@ APPLICATION_CREDENTIALS = [
"spotify",
"tesla_fleet",
"teslemetry",
"tibber",
"twitch",
"volvo",
"watts",

View File

@@ -776,6 +776,7 @@ FLOWS = {
"workday",
"worldclock",
"ws66i",
"wsdot",
"wyoming",
"xbox",
"xiaomi_aqara",

View File

@@ -7638,8 +7638,8 @@
},
"wsdot": {
"name": "Washington State Department of Transportation (WSDOT)",
"integration_type": "hub",
"config_flow": false,
"integration_type": "service",
"config_flow": true,
"iot_class": "cloud_polling"
},
"wyoming": {

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "homeassistant"
version = "2026.1.0.dev0"
version = "2026.1.0b0"
license = "Apache-2.0"
license-files = ["LICENSE*", "homeassistant/backports/LICENSE*"]
description = "Open-source home automation platform running on Python 3."

2
requirements_all.txt generated
View File

@@ -1867,7 +1867,7 @@ pyRFXtrx==0.31.1
pySDCP==1
# homeassistant.components.tibber
pyTibber==0.33.1
pyTibber==0.32.2
# homeassistant.components.dlink
pyW215==0.8.0

View File

@@ -1595,7 +1595,7 @@ pyHomee==1.3.8
pyRFXtrx==0.31.1
# homeassistant.components.tibber
pyTibber==0.33.1
pyTibber==0.32.2
# homeassistant.components.dlink
pyW215==0.8.0

View File

@@ -48,7 +48,7 @@
},
"switch": {
"switch": {
"value": "off",
"value": "on",
"timestamp": "2025-11-11T23:41:24.907Z"
}
},
@@ -305,7 +305,7 @@
"timestamp": "2025-11-11T23:41:21.525Z"
},
"hoodFanSpeed": {
"value": 14,
"value": 15,
"timestamp": "2025-11-11T23:41:21.525Z"
},
"supportedHoodFanSpeed": {

View File

@@ -1,4 +1,63 @@
# serializer version: 1
# name: test_all_entities[da_ks_hood_01001][fan.range_hood-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': dict({
'preset_modes': list([
'smart',
]),
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'fan',
'entity_category': None,
'entity_id': 'fan.range_hood',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': None,
'platform': 'smartthings',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': <FanEntityFeature: 57>,
'translation_key': 'hood',
'unique_id': 'fa5fca25-fa7a-1807-030a-2f72ee0f7bff_main',
'unit_of_measurement': None,
})
# ---
# name: test_all_entities[da_ks_hood_01001][fan.range_hood-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Range hood',
'percentage': 25,
'percentage_step': 25.0,
'preset_mode': None,
'preset_modes': list([
'smart',
]),
'supported_features': <FanEntityFeature: 57>,
}),
'context': <ANY>,
'entity_id': 'fan.range_hood',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'on',
})
# ---
# name: test_all_entities[fake_fan][fan.fake_fan-entry]
EntityRegistryEntrySnapshot({
'aliases': set({

View File

@@ -236,7 +236,7 @@
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'off',
'state': 'on',
})
# ---
# name: test_all_entities[da_ks_walloven_0107x][switch.four_sabbath_mode-entry]

View File

@@ -2,7 +2,7 @@
from unittest.mock import AsyncMock
from pysmartthings import Capability, Command
from pysmartthings import Attribute, Capability, Command
from pysmartthings.models import HealthStatus
import pytest
from syrupy.assertion import SnapshotAssertion
@@ -26,7 +26,12 @@ from homeassistant.const import (
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from . import setup_integration, snapshot_smartthings_entities, trigger_health_update
from . import (
setup_integration,
snapshot_smartthings_entities,
trigger_health_update,
trigger_update,
)
from tests.common import MockConfigEntry
@@ -44,7 +49,13 @@ async def test_all_entities(
snapshot_smartthings_entities(hass, entity_registry, snapshot, Platform.FAN)
@pytest.mark.parametrize("device_fixture", ["fake_fan"])
@pytest.mark.parametrize(
("device_fixture", "entity_id", "device_id"),
[
("fake_fan", "fan.fake_fan", "f1af21a2-d5a1-437c-b10a-b34a87394b71"),
("da_ks_hood_01001", "fan.range_hood", "fa5fca25-fa7a-1807-030a-2f72ee0f7bff"),
],
)
@pytest.mark.parametrize(
("action", "command"),
[
@@ -58,6 +69,8 @@ async def test_turn_on_off(
mock_config_entry: MockConfigEntry,
action: str,
command: Command,
entity_id: str,
device_id: str,
) -> None:
"""Test turning on and off."""
await setup_integration(hass, mock_config_entry)
@@ -65,11 +78,11 @@ async def test_turn_on_off(
await hass.services.async_call(
FAN_DOMAIN,
action,
{ATTR_ENTITY_ID: "fan.fake_fan"},
{ATTR_ENTITY_ID: entity_id},
blocking=True,
)
devices.execute_device_command.assert_called_once_with(
"f1af21a2-d5a1-437c-b10a-b34a87394b71",
device_id,
Capability.SWITCH,
command,
MAIN,
@@ -100,11 +113,19 @@ async def test_set_percentage(
)
@pytest.mark.parametrize("device_fixture", ["fake_fan"])
@pytest.mark.parametrize(
("device_fixture", "entity_id", "device_id"),
[
("fake_fan", "fan.fake_fan", "f1af21a2-d5a1-437c-b10a-b34a87394b71"),
("da_ks_hood_01001", "fan.range_hood", "fa5fca25-fa7a-1807-030a-2f72ee0f7bff"),
],
)
async def test_set_percentage_off(
hass: HomeAssistant,
devices: AsyncMock,
mock_config_entry: MockConfigEntry,
entity_id: str,
device_id: str,
) -> None:
"""Test setting the speed percentage of the fan."""
await setup_integration(hass, mock_config_entry)
@@ -112,11 +133,11 @@ async def test_set_percentage_off(
await hass.services.async_call(
FAN_DOMAIN,
SERVICE_SET_PERCENTAGE,
{ATTR_ENTITY_ID: "fan.fake_fan", ATTR_PERCENTAGE: 0},
{ATTR_ENTITY_ID: entity_id, ATTR_PERCENTAGE: 0},
blocking=True,
)
devices.execute_device_command.assert_called_once_with(
"f1af21a2-d5a1-437c-b10a-b34a87394b71",
device_id,
Capability.SWITCH,
Command.OFF,
MAIN,
@@ -204,3 +225,80 @@ async def test_availability_at_start(
"""Test unavailable at boot."""
await setup_integration(hass, mock_config_entry)
assert hass.states.get("fan.fake_fan").state == STATE_UNAVAILABLE
@pytest.mark.parametrize("device_fixture", ["da_ks_hood_01001"])
async def test_set_hood_percentage(
hass: HomeAssistant,
devices: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test setting the speed percentage of the hood."""
await setup_integration(hass, mock_config_entry)
await hass.services.async_call(
FAN_DOMAIN,
SERVICE_SET_PERCENTAGE,
{ATTR_ENTITY_ID: "fan.range_hood", ATTR_PERCENTAGE: 50},
blocking=True,
)
devices.execute_device_command.assert_called_once_with(
"fa5fca25-fa7a-1807-030a-2f72ee0f7bff",
Capability.SAMSUNG_CE_HOOD_FAN_SPEED,
Command.SET_HOOD_FAN_SPEED,
MAIN,
argument=16,
)
@pytest.mark.parametrize("device_fixture", ["da_ks_hood_01001"])
async def test_set_hood_preset_mode(
hass: HomeAssistant,
devices: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test setting the preset mode of the hood."""
await setup_integration(hass, mock_config_entry)
await hass.services.async_call(
FAN_DOMAIN,
SERVICE_SET_PRESET_MODE,
{ATTR_ENTITY_ID: "fan.range_hood", ATTR_PRESET_MODE: "smart"},
blocking=True,
)
devices.execute_device_command.assert_called_once_with(
"fa5fca25-fa7a-1807-030a-2f72ee0f7bff",
Capability.SAMSUNG_CE_HOOD_FAN_SPEED,
Command.SET_HOOD_FAN_SPEED,
MAIN,
argument=14,
)
@pytest.mark.parametrize("device_fixture", ["da_ks_hood_01001"])
async def test_updating_hood_preset_mode(
hass: HomeAssistant,
devices: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test updating the preset mode of the hood."""
await setup_integration(hass, mock_config_entry)
state = hass.states.get("fan.range_hood")
assert state
assert state.attributes[ATTR_PRESET_MODE] is None
assert state.attributes[ATTR_PERCENTAGE] == 25
await trigger_update(
hass,
devices,
"fa5fca25-fa7a-1807-030a-2f72ee0f7bff",
Capability.SAMSUNG_CE_HOOD_FAN_SPEED,
Attribute.HOOD_FAN_SPEED,
14,
)
state = hass.states.get("fan.range_hood")
assert state
assert state.attributes[ATTR_PRESET_MODE] == "smart"
assert state.attributes[ATTR_PERCENTAGE] is None

View File

@@ -1,76 +1,24 @@
"""Test helpers for Tibber."""
from collections.abc import AsyncGenerator
import time
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
import pytest
import tibber
from homeassistant.components.application_credentials import (
ClientCredential,
async_import_client_credential,
)
from homeassistant.components.recorder import Recorder
from homeassistant.components.tibber.const import AUTH_IMPLEMENTATION, DOMAIN
from homeassistant.components.tibber.const import DOMAIN
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from tests.common import MockConfigEntry
def create_tibber_device(
device_id: str = "device-id",
external_id: str = "external-id",
name: str = "Test Device",
brand: str = "Tibber",
model: str = "Gen1",
value: float | None = 72.0,
home_id: str = "home-id",
) -> tibber.data_api.TibberDevice:
"""Create a fake Tibber Data API device."""
device_data = {
"id": device_id,
"externalId": external_id,
"info": {
"name": name,
"brand": brand,
"model": model,
},
"capabilities": [
{
"id": "storage.stateOfCharge",
"value": value,
"description": "State of charge",
"unit": "%",
},
{
"id": "unknown.sensor.id",
"value": None,
"description": "Unknown",
"unit": "",
},
],
}
return tibber.data_api.TibberDevice(device_data, home_id=home_id)
@pytest.fixture
def config_entry(hass: HomeAssistant) -> MockConfigEntry:
"""Tibber config entry."""
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_ACCESS_TOKEN: "token",
AUTH_IMPLEMENTATION: DOMAIN,
"token": {
"access_token": "test-token",
"refresh_token": "refresh-token",
"token_type": "Bearer",
"expires_at": time.time() + 3600,
},
},
data={CONF_ACCESS_TOKEN: "token"},
unique_id="tibber",
)
config_entry.add_to_hass(hass)
@@ -78,69 +26,21 @@ def config_entry(hass: HomeAssistant) -> MockConfigEntry:
@pytest.fixture
def _tibber_patches() -> AsyncGenerator[tuple[MagicMock, MagicMock]]:
"""Patch the Tibber libraries used by the integration."""
async def mock_tibber_setup(
recorder_mock: Recorder, config_entry: MockConfigEntry, hass: HomeAssistant
) -> AsyncGenerator[MagicMock]:
"""Mock tibber entry setup."""
unique_user_id = "unique_user_id"
title = "title"
with (
patch(
"tibber.Tibber",
autospec=True,
) as mock_tibber,
patch(
"tibber.data_api.TibberDataAPI",
autospec=True,
) as mock_data_api_client,
):
tibber_mock = mock_tibber.return_value
tibber_mock.update_info = AsyncMock(return_value=True)
tibber_mock.user_id = unique_user_id
tibber_mock.name = title
tibber_mock.send_notification = AsyncMock()
tibber_mock.rt_disconnect = AsyncMock()
tibber_mock.get_homes = MagicMock(return_value=[])
tibber_mock = MagicMock()
tibber_mock.update_info = AsyncMock(return_value=True)
tibber_mock.user_id = PropertyMock(return_value=unique_user_id)
tibber_mock.name = PropertyMock(return_value=title)
tibber_mock.send_notification = AsyncMock()
tibber_mock.rt_disconnect = AsyncMock()
data_api_client_mock = mock_data_api_client.return_value
data_api_client_mock.get_all_devices = AsyncMock(return_value={})
data_api_client_mock.update_devices = AsyncMock(return_value={})
yield tibber_mock, data_api_client_mock
@pytest.fixture
def tibber_mock(_tibber_patches: tuple[MagicMock, MagicMock]) -> MagicMock:
"""Return the patched Tibber connection mock."""
return _tibber_patches[0]
@pytest.fixture
def data_api_client_mock(_tibber_patches: tuple[MagicMock, MagicMock]) -> MagicMock:
"""Return the patched Tibber Data API client mock."""
return _tibber_patches[1]
@pytest.fixture
async def mock_tibber_setup(
recorder_mock: Recorder,
config_entry: MockConfigEntry,
hass: HomeAssistant,
tibber_mock: MagicMock,
setup_credentials: None,
) -> MagicMock:
"""Mock tibber entry setup."""
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return tibber_mock
@pytest.fixture
async def setup_credentials(hass: HomeAssistant) -> None:
"""Set up application credentials for the OAuth flow."""
assert await async_setup_component(hass, "application_credentials", {})
await async_import_client_credential(
hass,
DOMAIN,
ClientCredential("test-client-id", "test-client-secret"),
DOMAIN,
)
with patch("tibber.Tibber", return_value=tibber_mock):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
yield tibber_mock

View File

@@ -1,9 +1,7 @@
"""Tests for Tibber config flow."""
import builtins
from http import HTTPStatus
from unittest.mock import AsyncMock, MagicMock, patch
from urllib.parse import parse_qs, urlparse
from asyncio import TimeoutError
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
from aiohttp import ClientError
import pytest
@@ -15,22 +13,16 @@ from tibber import (
from homeassistant import config_entries
from homeassistant.components.recorder import Recorder
from homeassistant.components.tibber.application_credentials import TOKEN_URL
from homeassistant.components.tibber.config_flow import (
DATA_API_DEFAULT_SCOPES,
ERR_CLIENT,
ERR_TIMEOUT,
ERR_TOKEN,
)
from homeassistant.components.tibber.const import AUTH_IMPLEMENTATION, DOMAIN
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_TOKEN
from homeassistant.components.tibber.const import DOMAIN
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from tests.common import MockConfigEntry
from tests.test_util.aiohttp import AiohttpClientMocker
from tests.typing import ClientSessionGenerator
@pytest.fixture(name="tibber_setup", autouse=True)
def tibber_setup_fixture():
@@ -39,22 +31,6 @@ def tibber_setup_fixture():
yield
def _mock_tibber(
tibber_mock: MagicMock,
*,
user_id: str = "unique_user_id",
title: str = "Mock Name",
update_side_effect: Exception | None = None,
) -> MagicMock:
"""Configure the patched Tibber GraphQL client."""
tibber_mock.user_id = user_id
tibber_mock.name = title
tibber_mock.update_info = AsyncMock()
if update_side_effect is not None:
tibber_mock.update_info.side_effect = update_side_effect
return tibber_mock
async def test_show_config_form(recorder_mock: Recorder, hass: HomeAssistant) -> None:
"""Test show configuration form."""
result = await hass.config_entries.flow.async_init(
@@ -65,239 +41,77 @@ async def test_show_config_form(recorder_mock: Recorder, hass: HomeAssistant) ->
assert result["step_id"] == "user"
async def test_create_entry(recorder_mock: Recorder, hass: HomeAssistant) -> None:
"""Test create entry from user input."""
test_data = {
CONF_ACCESS_TOKEN: "valid",
}
unique_user_id = "unique_user_id"
title = "title"
tibber_mock = MagicMock()
type(tibber_mock).update_info = AsyncMock(return_value=True)
type(tibber_mock).user_id = PropertyMock(return_value=unique_user_id)
type(tibber_mock).name = PropertyMock(return_value=title)
with patch("tibber.Tibber", return_value=tibber_mock):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=test_data
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == title
assert result["data"] == test_data
@pytest.mark.parametrize(
("exception", "expected_error"),
[
(builtins.TimeoutError(), ERR_TIMEOUT),
(ClientError(), ERR_CLIENT),
(TimeoutError, ERR_TIMEOUT),
(ClientError, ERR_CLIENT),
(InvalidLoginError(401), ERR_TOKEN),
(RetryableHttpExceptionError(503), ERR_CLIENT),
(FatalHttpExceptionError(404), ERR_CLIENT),
],
)
async def test_graphql_step_exceptions(
recorder_mock: Recorder,
hass: HomeAssistant,
tibber_mock: MagicMock,
exception: Exception,
expected_error: str,
async def test_create_entry_exceptions(
recorder_mock: Recorder, hass: HomeAssistant, exception, expected_error
) -> None:
"""Validate GraphQL errors are surfaced."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
"""Test create entry from user input."""
test_data = {
CONF_ACCESS_TOKEN: "valid",
}
_mock_tibber(tibber_mock, update_side_effect=exception)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_ACCESS_TOKEN: "invalid"}
)
unique_user_id = "unique_user_id"
title = "title"
tibber_mock = MagicMock()
type(tibber_mock).update_info = AsyncMock(side_effect=exception)
type(tibber_mock).user_id = PropertyMock(return_value=unique_user_id)
type(tibber_mock).name = PropertyMock(return_value=title)
with patch("tibber.Tibber", return_value=tibber_mock):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=test_data
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"][CONF_ACCESS_TOKEN] == expected_error
async def test_flow_entry_already_exists(
recorder_mock: Recorder,
hass: HomeAssistant,
config_entry,
tibber_mock: MagicMock,
recorder_mock: Recorder, hass: HomeAssistant, config_entry
) -> None:
"""Test user input for config_entry that already exists."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
test_data = {
CONF_ACCESS_TOKEN: "valid",
}
_mock_tibber(tibber_mock, user_id="tibber")
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_ACCESS_TOKEN: "valid"}
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_reauth_flow_steps(
recorder_mock: Recorder,
hass: HomeAssistant,
config_entry: MockConfigEntry,
) -> None:
"""Test the reauth flow goes through reauth_confirm to user step."""
reauth_flow = await config_entry.start_reauth_flow(hass)
assert reauth_flow["type"] is FlowResultType.FORM
assert reauth_flow["step_id"] == "reauth_confirm"
result = await hass.config_entries.flow.async_configure(reauth_flow["flow_id"])
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "reauth_confirm"
result = await hass.config_entries.flow.async_configure(
reauth_flow["flow_id"],
user_input={},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
async def test_oauth_create_entry_missing_configuration(
recorder_mock: Recorder,
hass: HomeAssistant,
) -> None:
"""Abort OAuth finalize if GraphQL step did not run."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_USER},
)
handler = hass.config_entries.flow._progress[result["flow_id"]]
flow_result = await handler.async_oauth_create_entry(
{CONF_TOKEN: {CONF_ACCESS_TOKEN: "rest-token"}}
)
assert flow_result["type"] is FlowResultType.ABORT
assert flow_result["reason"] == "missing_configuration"
async def test_oauth_create_entry_cannot_connect_userinfo(
recorder_mock: Recorder,
hass: HomeAssistant,
data_api_client_mock: MagicMock,
) -> None:
"""Abort OAuth finalize when Data API userinfo cannot be retrieved."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_USER},
)
handler = hass.config_entries.flow._progress[result["flow_id"]]
handler._access_token = "graphql-token"
data_api_client_mock.get_userinfo = AsyncMock(side_effect=ClientError())
flow_result = await handler.async_oauth_create_entry(
{CONF_TOKEN: {CONF_ACCESS_TOKEN: "rest-token"}}
)
assert flow_result["type"] is FlowResultType.ABORT
assert flow_result["reason"] == "cannot_connect"
async def test_data_api_requires_credentials(
recorder_mock: Recorder,
hass: HomeAssistant,
tibber_mock: MagicMock,
) -> None:
"""Abort when OAuth credentials are missing."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
_mock_tibber(tibber_mock)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_ACCESS_TOKEN: "valid"}
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "missing_credentials"
@pytest.mark.usefixtures("setup_credentials", "current_request_with_host")
async def test_data_api_extra_authorize_scope(
hass: HomeAssistant,
tibber_mock: MagicMock,
) -> None:
"""Ensure the OAuth implementation requests Tibber scopes."""
with patch("homeassistant.components.recorder.async_setup", return_value=True):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
_mock_tibber(tibber_mock)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_ACCESS_TOKEN: "valid"}
)
handler = hass.config_entries.flow._progress[result["flow_id"]]
assert handler.extra_authorize_data["scope"] == " ".join(
DATA_API_DEFAULT_SCOPES
)
@pytest.mark.usefixtures("setup_credentials", "current_request_with_host")
async def test_full_flow_success(
hass: HomeAssistant,
hass_client_no_auth: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
tibber_mock: MagicMock,
data_api_client_mock: MagicMock,
) -> None:
"""Test configuring Tibber via GraphQL + OAuth."""
with patch("homeassistant.components.recorder.async_setup", return_value=True):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
_mock_tibber(tibber_mock)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_ACCESS_TOKEN: "graphql-token"}
)
assert result["type"] is FlowResultType.EXTERNAL_STEP
authorize_url = result["url"]
state = parse_qs(urlparse(authorize_url).query)["state"][0]
client = await hass_client_no_auth()
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
assert resp.status == HTTPStatus.OK
aioclient_mock.post(
TOKEN_URL,
json={
"access_token": "mock-access-token",
"refresh_token": "mock-refresh-token",
"token_type": "bearer",
"expires_in": 3600,
},
)
data_api_client_mock.get_userinfo = AsyncMock(
return_value={"name": "Mock Name"}
)
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["type"] is FlowResultType.CREATE_ENTRY
data = result["data"]
assert data[CONF_TOKEN]["access_token"] == "mock-access-token"
assert data[CONF_ACCESS_TOKEN] == "graphql-token"
assert data[AUTH_IMPLEMENTATION] == DOMAIN
assert result["title"] == "Mock Name"
async def test_data_api_abort_when_already_configured(
recorder_mock: Recorder,
hass: HomeAssistant,
tibber_mock: MagicMock,
) -> None:
"""Ensure only a single Data API entry can be configured."""
existing_entry = MockConfigEntry(
domain=DOMAIN,
data={
AUTH_IMPLEMENTATION: DOMAIN,
CONF_TOKEN: {"access_token": "existing"},
CONF_ACCESS_TOKEN: "stored-graphql",
},
unique_id="unique_user_id",
title="Existing",
)
existing_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
_mock_tibber(tibber_mock)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_ACCESS_TOKEN: "new-token"}
)
with patch("tibber.Tibber.update_info", return_value=None):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=test_data
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"

View File

@@ -1,178 +1,56 @@
"""Test the Tibber diagnostics."""
"""Test the Netatmo diagnostics."""
from unittest.mock import AsyncMock, MagicMock
import aiohttp
import pytest
import tibber
from unittest.mock import patch
from homeassistant.components.recorder import Recorder
from homeassistant.components.tibber.diagnostics import (
async_get_config_entry_diagnostics,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.setup import async_setup_component
from .conftest import create_tibber_device
from .test_common import mock_get_homes
from tests.common import MockConfigEntry
from tests.components.diagnostics import get_diagnostics_for_config_entry
from tests.typing import ClientSessionGenerator
async def test_entry_diagnostics_empty(
async def test_entry_diagnostics(
recorder_mock: Recorder,
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
config_entry: MockConfigEntry,
mock_tibber_setup: MagicMock,
config_entry,
) -> None:
"""Test config entry diagnostics with no homes."""
tibber_mock = mock_tibber_setup
tibber_mock.get_homes.return_value = []
"""Test config entry diagnostics."""
with patch(
"tibber.Tibber.update_info",
return_value=None,
):
assert await async_setup_component(hass, "tibber", {})
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
assert isinstance(result, dict)
assert "homes" in result
assert "devices" in result
assert result["homes"] == []
assert result["devices"] == []
async def test_entry_diagnostics_with_homes(
recorder_mock: Recorder,
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
config_entry: MockConfigEntry,
mock_tibber_setup: MagicMock,
) -> None:
"""Test config entry diagnostics with homes."""
tibber_mock = mock_tibber_setup
tibber_mock.get_homes.side_effect = mock_get_homes
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
assert isinstance(result, dict)
assert "homes" in result
assert "devices" in result
homes = result["homes"]
assert isinstance(homes, list)
assert len(homes) == 1
home = homes[0]
assert "last_data_timestamp" in home
assert "has_active_subscription" in home
assert "has_real_time_consumption" in home
assert "last_cons_data_timestamp" in home
assert "country" in home
assert home["has_active_subscription"] is True
assert home["has_real_time_consumption"] is False
assert home["country"] == "NO"
async def test_data_api_diagnostics_no_data(
recorder_mock: Recorder,
hass: HomeAssistant,
config_entry: MockConfigEntry,
data_api_client_mock: MagicMock,
setup_credentials: None,
) -> None:
"""Test Data API diagnostics when coordinator has no data."""
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
data_api_client_mock.get_all_devices.assert_awaited_once()
data_api_client_mock.update_devices.assert_awaited_once()
with patch(
"tibber.Tibber.get_homes",
return_value=[],
):
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
result = await async_get_config_entry_diagnostics(hass, config_entry)
assert isinstance(result, dict)
assert "homes" in result
assert "devices" in result
assert isinstance(result["homes"], list)
assert isinstance(result["devices"], list)
assert result["devices"] == []
async def test_data_api_diagnostics_with_devices(
recorder_mock: Recorder,
hass: HomeAssistant,
config_entry: MockConfigEntry,
data_api_client_mock: MagicMock,
setup_credentials: None,
) -> None:
"""Test Data API diagnostics with successful device retrieval."""
devices = {
"device-1": create_tibber_device(
device_id="device-1",
name="Device 1",
brand="Tibber",
model="Test Model",
),
"device-2": create_tibber_device(
device_id="device-2",
name="Device 2",
brand="Tibber",
model="Test Model",
),
assert result == {
"homes": [],
}
data_api_client_mock.get_all_devices = AsyncMock(return_value=devices)
data_api_client_mock.update_devices = AsyncMock(return_value=devices)
with patch(
"tibber.Tibber.get_homes",
side_effect=mock_get_homes,
):
result = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
result = await async_get_config_entry_diagnostics(hass, config_entry)
assert isinstance(result, dict)
assert "homes" in result
assert "devices" in result
devices_list = result["devices"]
assert isinstance(devices_list, list)
assert len(devices_list) == 2
device_1 = next((d for d in devices_list if d["id"] == "device-1"), None)
assert device_1 is not None
assert device_1["name"] == "Device 1"
assert device_1["brand"] == "Tibber"
assert device_1["model"] == "Test Model"
device_2 = next((d for d in devices_list if d["id"] == "device-2"), None)
assert device_2 is not None
assert device_2["name"] == "Device 2"
assert device_2["brand"] == "Tibber"
assert device_2["model"] == "Test Model"
@pytest.mark.parametrize(
"exception",
[
ConfigEntryAuthFailed("Auth failed"),
TimeoutError(),
aiohttp.ClientError("Connection error"),
tibber.InvalidLoginError(401),
tibber.RetryableHttpExceptionError(503),
tibber.FatalHttpExceptionError(404),
],
)
async def test_data_api_diagnostics_exceptions(
recorder_mock: Recorder,
hass: HomeAssistant,
config_entry: MockConfigEntry,
tibber_mock: MagicMock,
setup_credentials: None,
exception: Exception,
) -> None:
"""Test Data API diagnostics with various exception scenarios."""
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
tibber_mock.get_homes.side_effect = exception
with pytest.raises(type(exception)):
await async_get_config_entry_diagnostics(hass, config_entry)
assert result == {
"homes": [
{
"last_data_timestamp": "2016-01-01T12:48:57",
"has_active_subscription": True,
"has_real_time_consumption": False,
"last_cons_data_timestamp": "2016-01-01T12:44:57",
"country": "NO",
}
],
}

View File

@@ -1,17 +1,11 @@
"""Test loading of the Tibber config entry."""
from unittest.mock import ANY, AsyncMock, MagicMock, patch
import pytest
from unittest.mock import MagicMock
from homeassistant.components.recorder import Recorder
from homeassistant.components.tibber import DOMAIN, TibberRuntimeData, async_setup_entry
from homeassistant.components.tibber import DOMAIN
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from tests.common import MockConfigEntry
async def test_entry_unload(
@@ -25,69 +19,3 @@ async def test_entry_unload(
mock_tibber_setup.rt_disconnect.assert_called_once()
await hass.async_block_till_done(wait_background_tasks=True)
assert entry.state is ConfigEntryState.NOT_LOADED
@pytest.mark.usefixtures("recorder_mock")
async def test_data_api_runtime_creates_client(hass: HomeAssistant) -> None:
"""Ensure the data API runtime creates and caches the client."""
session = MagicMock()
session.async_ensure_token_valid = AsyncMock()
session.token = {CONF_ACCESS_TOKEN: "access-token"}
runtime = TibberRuntimeData(
session=session,
tibber_connection=MagicMock(),
)
with patch(
"homeassistant.components.tibber.tibber_data_api.TibberDataAPI"
) as mock_client_cls:
mock_client = MagicMock()
mock_client.set_access_token = MagicMock()
mock_client_cls.return_value = mock_client
client = await runtime.async_get_client(hass)
mock_client_cls.assert_called_once_with("access-token", websession=ANY)
session.async_ensure_token_valid.assert_awaited_once()
mock_client.set_access_token.assert_called_once_with("access-token")
assert client is mock_client
mock_client.set_access_token.reset_mock()
session.async_ensure_token_valid.reset_mock()
cached_client = await runtime.async_get_client(hass)
mock_client_cls.assert_called_once()
session.async_ensure_token_valid.assert_awaited_once()
mock_client.set_access_token.assert_called_once_with("access-token")
assert cached_client is client
@pytest.mark.usefixtures("recorder_mock")
async def test_data_api_runtime_missing_token_raises(hass: HomeAssistant) -> None:
"""Ensure missing tokens trigger reauthentication."""
session = MagicMock()
session.async_ensure_token_valid = AsyncMock()
session.token = {}
runtime = TibberRuntimeData(
session=session,
tibber_connection=MagicMock(),
)
with pytest.raises(ConfigEntryAuthFailed):
await runtime.async_get_client(hass)
session.async_ensure_token_valid.assert_awaited_once()
async def test_setup_requires_data_api_reauth(hass: HomeAssistant) -> None:
"""Ensure legacy entries trigger reauth to configure Data API."""
entry = MockConfigEntry(
domain=DOMAIN,
data={CONF_ACCESS_TOKEN: "legacy-token"},
unique_id="legacy",
)
with pytest.raises(ConfigEntryAuthFailed):
await async_setup_entry(hass, entry)

View File

@@ -1,45 +0,0 @@
"""Tests for the Tibber Data API sensors and coordinator."""
from __future__ import annotations
from unittest.mock import AsyncMock
from homeassistant.components.recorder import Recorder
from homeassistant.components.tibber.const import DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from .conftest import create_tibber_device
from tests.common import MockConfigEntry
async def test_data_api_sensors_are_created(
recorder_mock: Recorder,
hass: HomeAssistant,
config_entry: MockConfigEntry,
data_api_client_mock: AsyncMock,
setup_credentials: None,
entity_registry: er.EntityRegistry,
) -> None:
"""Ensure Data API sensors are created and expose values from the coordinator."""
data_api_client_mock.get_all_devices = AsyncMock(
return_value={"device-id": create_tibber_device(value=72.0)}
)
data_api_client_mock.update_devices = AsyncMock(
return_value={"device-id": create_tibber_device(value=83.0)}
)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
data_api_client_mock.get_all_devices.assert_awaited_once()
data_api_client_mock.update_devices.assert_awaited_once()
unique_id = "external-id_storage.stateOfCharge"
entity_id = entity_registry.async_get_entity_id("sensor", DOMAIN, unique_id)
assert entity_id is not None
state = hass.states.get(entity_id)
assert state is not None
assert float(state.state) == 83.0

View File

@@ -1,24 +1,113 @@
"""Provide common WSDOT fixtures."""
from collections.abc import AsyncGenerator
from unittest.mock import patch
from collections.abc import Generator
from typing import Any
from unittest.mock import AsyncMock, patch
import pytest
from wsdot import TravelTime
from wsdot import TravelTime, WsdotTravelError
from homeassistant.components.wsdot.sensor import DOMAIN
from homeassistant.components.wsdot.const import DOMAIN
from homeassistant.config_entries import ConfigSubentryData
from homeassistant.const import CONF_API_KEY, CONF_ID, CONF_NAME
from homeassistant.core import HomeAssistant
from tests.common import load_json_object_fixture
from tests.common import MockConfigEntry, load_json_object_fixture
@pytest.fixture
def mock_travel_time() -> AsyncGenerator[TravelTime]:
def mock_travel_time() -> Generator[AsyncMock]:
"""WsdotTravelTimes.get_travel_time is mocked to return a TravelTime data based on test fixture payload."""
with patch(
"homeassistant.components.wsdot.sensor.WsdotTravelTimes", autospec=True
) as mock:
with (
patch(
"homeassistant.components.wsdot.wsdot_api.WsdotTravelTimes", autospec=True
) as mock,
patch(
"homeassistant.components.wsdot.config_flow.wsdot_api.WsdotTravelTimes",
new=mock,
),
):
client = mock.return_value
client.get_travel_time.return_value = TravelTime(
**load_json_object_fixture("wsdot.json", DOMAIN)
response = TravelTime(**load_json_object_fixture("wsdot.json", DOMAIN))
client.get_travel_time.return_value = response
client.get_all_travel_times.return_value = [response]
yield client
@pytest.fixture
def failed_travel_time_status() -> int:
"""Return the default status code for failed travel time requests."""
return 400
@pytest.fixture
def mock_failed_travel_time(
mock_travel_time: AsyncMock, failed_travel_time_status: int
) -> AsyncMock:
"""WsdotTravelTimes.get_travel_time is mocked to raise a WsdotTravelError."""
mock_travel_time.get_travel_time.side_effect = WsdotTravelError(
status=failed_travel_time_status
)
mock_travel_time.get_all_travel_times.side_effect = WsdotTravelError(
status=failed_travel_time_status
)
return mock_travel_time
@pytest.fixture
def mock_config_data() -> dict[str, Any]:
"""Return valid test config data."""
return {
CONF_API_KEY: "abcd-1234",
}
@pytest.fixture
def mock_subentries() -> list[ConfigSubentryData]:
"""Mock subentries."""
return [
ConfigSubentryData(
subentry_type="travel_time",
title="I-90 EB",
unique_id="96",
data={
CONF_ID: 96,
CONF_NAME: "Seattle-Bellevue via I-90 (EB AM)",
},
)
yield mock
]
@pytest.fixture
def mock_config_entry(
mock_config_data: dict[str, Any], mock_subentries: list[ConfigSubentryData]
) -> MockConfigEntry:
"""Mock a wsdot config entry."""
return MockConfigEntry(
domain=DOMAIN,
data=mock_config_data,
subentries_data=mock_subentries,
)
@pytest.fixture
async def init_integration(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
) -> MockConfigEntry:
"""Set up wsdot integration with subentries for testing."""
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
return mock_config_entry
@pytest.fixture
def mock_setup_entry() -> Generator[AsyncMock]:
"""Mock config entry setup."""
with patch(
"homeassistant.components.wsdot.async_setup_entry", return_value=True
) as mock_setup:
yield mock_setup

View File

@@ -0,0 +1,75 @@
# serializer version: 1
# name: test_travel_sensor_details[sensor.seattle_bellevue_via_i_90_eb_am-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': None,
'entity_id': 'sensor.seattle_bellevue_via_i_90_eb_am',
'has_entity_name': False,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': 'mdi:car',
'original_name': 'Seattle-Bellevue via I-90 (EB AM)',
'platform': 'wsdot',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'unique_id': 'travel_time-96',
'unit_of_measurement': <UnitOfTime.MINUTES: 'min'>,
})
# ---
# name: test_travel_sensor_details[sensor.seattle_bellevue_via_i_90_eb_am-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'AverageTime': 11,
'CurrentTime': 11,
'Description': 'Downtown Seattle to Downtown Bellevue via I-90',
'Distance': 10.6,
'EndPoint': dict({
'Description': 'I-405 @ NE 8th St in Bellevue',
'Direction': 'N',
'Latitude': 47.61361,
'Longitude': -122.18797,
'MilePost': 13.6,
'RoadName': 'I-405',
}),
'Name': 'Seattle-Bellevue via I-90 (EB AM)',
'StartPoint': dict({
'Description': 'I-5 @ University St in Seattle',
'Direction': 'S',
'Latitude': 47.609294,
'Longitude': -122.331759,
'MilePost': 165.83,
'RoadName': 'I-5',
}),
'TimeUpdated': datetime.datetime(2017, 1, 21, 15, 10, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=57600))),
'TravelTimeID': 96,
'attribution': 'Data provided by WSDOT',
'friendly_name': 'Seattle-Bellevue via I-90 (EB AM)',
'icon': 'mdi:car',
'unit_of_measurement': <UnitOfTime.MINUTES: 'min'>,
}),
'context': <ANY>,
'entity_id': 'sensor.seattle_bellevue_via_i_90_eb_am',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '11',
})
# ---

View File

@@ -0,0 +1,282 @@
"""Define tests for the wsdot config flow."""
from typing import Any
from unittest.mock import AsyncMock
import pytest
from wsdot import WsdotTravelError
from homeassistant.components.wsdot.const import (
CONF_TRAVEL_TIMES,
DOMAIN,
SUBENTRY_TRAVEL_TIMES,
)
from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER
from homeassistant.const import CONF_API_KEY, CONF_ID, CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from tests.common import MockConfigEntry
VALID_USER_CONFIG = {
CONF_API_KEY: "abcd-1234",
}
VALID_USER_TRAVEL_TIME_CONFIG = {
CONF_NAME: "Seattle-Bellevue via I-90 (EB AM)",
}
async def test_create_user_entry(
hass: HomeAssistant, mock_travel_time: AsyncMock
) -> None:
"""Test that the user step works."""
# No user data; form is being show for the first time
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
# User data; the user entered data and hit submit
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input=VALID_USER_CONFIG,
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == DOMAIN
assert result["data"][CONF_API_KEY] == "abcd-1234"
@pytest.mark.parametrize(
("failed_travel_time_status", "errors"),
[
(400, {CONF_API_KEY: "invalid_api_key"}),
(404, {"base": "cannot_connect"}),
],
)
async def test_errors(
hass: HomeAssistant,
mock_travel_time: AsyncMock,
mock_setup_entry: AsyncMock,
failed_travel_time_status: int,
errors: dict[str, str],
) -> None:
"""Test that the user step works."""
mock_travel_time.get_all_travel_times.side_effect = WsdotTravelError(
status=failed_travel_time_status
)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input=VALID_USER_CONFIG,
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == errors
mock_travel_time.get_all_travel_times.side_effect = None
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input=VALID_USER_CONFIG,
)
assert result["type"] is FlowResultType.CREATE_ENTRY
@pytest.mark.parametrize(
"mock_subentries",
[
[],
],
)
async def test_create_travel_time_subentry(
hass: HomeAssistant,
mock_travel_time: AsyncMock,
init_integration: MockConfigEntry,
) -> None:
"""Test that the user step for Travel Time works."""
# No user data; form is being show for the first time
result = await hass.config_entries.subentries.async_init(
(init_integration.entry_id, SUBENTRY_TRAVEL_TIMES),
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
# User data; the user made a choice and hit submit
result = await hass.config_entries.subentries.async_init(
(init_integration.entry_id, SUBENTRY_TRAVEL_TIMES),
context={"source": SOURCE_USER},
data=VALID_USER_TRAVEL_TIME_CONFIG,
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"][CONF_NAME] == "Seattle-Bellevue via I-90 (EB AM)"
assert result["data"][CONF_ID] == 96
@pytest.mark.parametrize(
"import_config",
[
{
CONF_API_KEY: "abcd-5678",
CONF_TRAVEL_TIMES: [{CONF_ID: 96, CONF_NAME: "I-90 EB"}],
},
{
CONF_API_KEY: "abcd-5678",
CONF_TRAVEL_TIMES: [{CONF_ID: "96", CONF_NAME: "I-90 EB"}],
},
],
ids=["with-int-id", "with-str-id"],
)
async def test_create_import_entry(
hass: HomeAssistant,
mock_travel_time: AsyncMock,
import_config: dict[str, str | int],
) -> None:
"""Test that the yaml import works."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=import_config,
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "wsdot"
assert result["data"][CONF_API_KEY] == "abcd-5678"
entry = result["result"]
assert entry is not None
assert len(entry.subentries) == 1
subentry = next(iter(entry.subentries.values()))
assert subentry.subentry_type == SUBENTRY_TRAVEL_TIMES
assert subentry.title == "Seattle-Bellevue via I-90 (EB AM)"
assert subentry.data[CONF_NAME] == "Seattle-Bellevue via I-90 (EB AM)"
assert subentry.data[CONF_ID] == 96
@pytest.mark.parametrize(
("failed_travel_time_status", "abort_reason"),
[
(400, "invalid_api_key"),
(404, "cannot_connect"),
],
)
async def test_failed_import_entry(
hass: HomeAssistant,
mock_failed_travel_time: AsyncMock,
mock_config_data: dict[str, Any],
failed_travel_time_status: int,
abort_reason: str,
) -> None:
"""Test the failure modes of a yaml import."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=mock_config_data,
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == abort_reason
async def test_incorrect_import_entry(
hass: HomeAssistant,
mock_travel_time: AsyncMock,
mock_config_data: dict[str, Any],
) -> None:
"""Test a yaml import of a non-existent route."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={
CONF_API_KEY: "abcd-5678",
CONF_TRAVEL_TIMES: [{CONF_ID: "100001", CONF_NAME: "nowhere"}],
},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "invalid_travel_time_id"
async def test_import_integration_already_exists(
hass: HomeAssistant,
mock_travel_time: AsyncMock,
mock_setup_entry: AsyncMock,
mock_config_entry: MockConfigEntry,
init_integration: MockConfigEntry,
) -> None:
"""Test we only allow one entry per API key."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={
CONF_API_KEY: "abcd-1234",
CONF_TRAVEL_TIMES: [{CONF_ID: "100001", CONF_NAME: "nowhere"}],
},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_integration_already_exists(
hass: HomeAssistant,
mock_travel_time: AsyncMock,
mock_config_entry: MockConfigEntry,
init_integration: MockConfigEntry,
) -> None:
"""Test we only allow one entry per API key."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert not result["errors"]
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input=VALID_USER_CONFIG,
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_travel_route_already_exists(
hass: HomeAssistant,
mock_travel_time: AsyncMock,
mock_config_entry: MockConfigEntry,
init_integration: MockConfigEntry,
) -> None:
"""Test we only allow choosing a travel time route once."""
result = await hass.config_entries.subentries.async_init(
(init_integration.entry_id, SUBENTRY_TRAVEL_TIMES),
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert not result["errors"]
result = await hass.config_entries.subentries.async_configure(
result["flow_id"],
user_input=VALID_USER_TRAVEL_TIME_CONFIG,
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"

View File

@@ -0,0 +1,21 @@
"""The tests for the WSDOT platform."""
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from homeassistant.helpers import issue_registry as ir
from tests.common import MockConfigEntry
async def test_travel_sensor_setup_no_auth(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_failed_travel_time: None,
issue_registry: ir.IssueRegistry,
) -> None:
"""Test the wsdot Travel Time sensor does not create an entry with a bad API key."""
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert mock_config_entry.state is ConfigEntryState.SETUP_ERROR

View File

@@ -1,41 +1,84 @@
"""The tests for the WSDOT platform."""
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock
from homeassistant.components.wsdot.sensor import (
from syrupy.assertion import SnapshotAssertion
from homeassistant.components.wsdot.const import CONF_TRAVEL_TIMES, DOMAIN
from homeassistant.const import (
CONF_API_KEY,
CONF_ID,
CONF_NAME,
CONF_TRAVEL_TIMES,
DOMAIN,
CONF_PLATFORM,
Platform,
)
from homeassistant.const import CONF_PLATFORM
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er, issue_registry as ir
from homeassistant.setup import async_setup_component
config = {
CONF_API_KEY: "foo",
CONF_TRAVEL_TIMES: [{CONF_ID: 96, CONF_NAME: "I90 EB"}],
}
from tests.common import MockConfigEntry, snapshot_platform
async def test_setup_with_config(
hass: HomeAssistant, mock_travel_time: AsyncMock
async def test_travel_sensor_details(
hass: HomeAssistant,
mock_travel_time: AsyncMock,
init_integration: MockConfigEntry,
snapshot: SnapshotAssertion,
entity_registry: er.EntityRegistry,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the platform setup with configuration."""
assert await async_setup_component(
hass, "sensor", {"sensor": [{CONF_PLATFORM: DOMAIN, **config}]}
)
"""Test the wsdot Travel Time sensor details."""
await snapshot_platform(hass, entity_registry, snapshot, mock_config_entry.entry_id)
state = hass.states.get("sensor.i90_eb")
assert state is not None
assert state.name == "I90 EB"
assert state.state == "11"
assert (
state.attributes["Description"]
== "Downtown Seattle to Downtown Bellevue via I-90"
async def test_travel_sensor_platform_setup(
hass: HomeAssistant, mock_travel_time: AsyncMock, issue_registry: ir.IssueRegistry
) -> None:
"""Test the wsdot Travel Time sensor still supports setup from platform config."""
assert await async_setup_component(
hass,
Platform.SENSOR,
{
Platform.SENSOR: [
{
CONF_PLATFORM: DOMAIN,
CONF_API_KEY: "foo",
CONF_TRAVEL_TIMES: [{CONF_ID: 96, CONF_NAME: "I90 EB"}],
}
]
},
)
assert state.attributes["TimeUpdated"] == datetime(
2017, 1, 21, 15, 10, tzinfo=timezone(timedelta(hours=-8))
await hass.async_block_till_done()
entry = next(iter(hass.config_entries.async_entries(DOMAIN)), None)
assert entry is not None
assert entry.data[CONF_API_KEY] == "foo"
assert len(entry.subentries) == 1
assert len(issue_registry.issues) == 1
async def test_travel_sensor_platform_setup_bad_routes(
hass: HomeAssistant, mock_travel_time: AsyncMock, issue_registry: ir.IssueRegistry
) -> None:
"""Test the wsdot Travel Time sensor platform upgrade skips unknown route ids."""
assert await async_setup_component(
hass,
Platform.SENSOR,
{
Platform.SENSOR: [
{
CONF_PLATFORM: DOMAIN,
CONF_API_KEY: "foo",
CONF_TRAVEL_TIMES: [{CONF_ID: 4096, CONF_NAME: "Mars Expressway"}],
}
]
},
)
await hass.async_block_till_done()
entry = next(iter(hass.config_entries.async_entries(DOMAIN)), None)
assert entry is None
assert len(issue_registry.issues) == 1
issue = issue_registry.async_get_issue(
DOMAIN, "deprecated_yaml_import_issue_invalid_travel_time_id"
)
assert issue