Compare commits

..

13 Commits

Author SHA1 Message Date
jbouwh
29d6412410 Fix typo 2025-10-14 20:46:57 +00:00
jbouwh
46e54f7f78 Follow up on code review 2025-10-14 20:46:02 +00:00
jbouwh
4f70fa30cf Implement mixin class and add feature to maintain included entities from unique IDs 2025-10-09 20:01:51 +00:00
jbouwh
ee24acf52a Add included_entities attribute to base Entity class 2025-10-08 20:32:58 +00:00
hanwg
42a9d5d4e3 Add webhook tests for Telegram bot (#153998) 2025-10-08 20:58:15 +02:00
Maciej Bieniek
93fa162913 Update IQS for IMGW-PIB integration (#153870) 2025-10-08 20:30:05 +02:00
Maciej Bieniek
c432b1c8da Add entities for Shely cury component (#153918) 2025-10-08 20:26:29 +02:00
Artur Pragacz
00955b8e6a Fix empty llm api list in chat log (#153996) 2025-10-08 10:39:56 -05:00
Erik Montnemery
045b9d7f01 Correct homeassistant.helpers.trigger._trigger_action_wrapper (#153983) 2025-10-08 17:33:44 +02:00
Aaron Bach
438c4c7871 Limit SimpliSafe websocket connection attempts during startup (#153853)
Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
2025-10-08 16:32:17 +02:00
Thomas D
abc360460c Add diagnostics to Volvo integration (#153997) 2025-10-08 16:25:33 +02:00
HarvsG
26437bb253 Adds ConfigFlow for London Underground (#152050)
Co-authored-by: Norbert Rittel <norbert@rittel.de>
Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
2025-10-08 16:17:34 +02:00
epenet
56d953ac1e Use contants in climate set_temperature (#154008) 2025-10-08 16:15:01 +02:00
59 changed files with 2400 additions and 1528 deletions

2
CODEOWNERS generated
View File

@@ -753,8 +753,6 @@ build.json @home-assistant/supervisor
/tests/components/input_select/ @home-assistant/core
/homeassistant/components/input_text/ @home-assistant/core
/tests/components/input_text/ @home-assistant/core
/homeassistant/components/input_weekday/ @home-assistant/core
/tests/components/input_weekday/ @home-assistant/core
/homeassistant/components/insteon/ @teharris1
/tests/components/insteon/ @teharris1
/homeassistant/components/integration/ @dgomes

View File

@@ -231,7 +231,6 @@ DEFAULT_INTEGRATIONS = {
"input_datetime",
"input_number",
"input_select",
"input_weekday",
"input_text",
"schedule",
"timer",

View File

@@ -7,6 +7,8 @@ from typing import Any
from pyaprilaire.const import Attribute
from homeassistant.components.climate import (
ATTR_TARGET_TEMP_HIGH,
ATTR_TARGET_TEMP_LOW,
FAN_AUTO,
FAN_ON,
PRESET_AWAY,
@@ -16,7 +18,12 @@ from homeassistant.components.climate import (
HVACAction,
HVACMode,
)
from homeassistant.const import PRECISION_HALVES, PRECISION_WHOLE, UnitOfTemperature
from homeassistant.const import (
ATTR_TEMPERATURE,
PRECISION_HALVES,
PRECISION_WHOLE,
UnitOfTemperature,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
@@ -232,15 +239,15 @@ class AprilaireClimate(BaseAprilaireEntity, ClimateEntity):
cool_setpoint = 0
heat_setpoint = 0
if temperature := kwargs.get("temperature"):
if temperature := kwargs.get(ATTR_TEMPERATURE):
if self.coordinator.data.get(Attribute.MODE) == 3:
cool_setpoint = temperature
else:
heat_setpoint = temperature
else:
if target_temp_low := kwargs.get("target_temp_low"):
if target_temp_low := kwargs.get(ATTR_TARGET_TEMP_LOW):
heat_setpoint = target_temp_low
if target_temp_high := kwargs.get("target_temp_high"):
if target_temp_high := kwargs.get(ATTR_TARGET_TEMP_HIGH):
cool_setpoint = target_temp_high
if cool_setpoint == 0 and heat_setpoint == 0:

View File

@@ -7,12 +7,14 @@ from typing import Any
from evolutionhttp import BryantEvolutionLocalClient
from homeassistant.components.climate import (
ATTR_TARGET_TEMP_HIGH,
ATTR_TARGET_TEMP_LOW,
ClimateEntity,
ClimateEntityFeature,
HVACAction,
HVACMode,
)
from homeassistant.const import UnitOfTemperature
from homeassistant.const import ATTR_TEMPERATURE, UnitOfTemperature
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.device_registry import DeviceInfo
@@ -208,24 +210,24 @@ class BryantEvolutionClimate(ClimateEntity):
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set new target temperature."""
if kwargs.get("target_temp_high"):
temp = int(kwargs["target_temp_high"])
if value := kwargs.get(ATTR_TARGET_TEMP_HIGH):
temp = int(value)
if not await self._client.set_cooling_setpoint(temp):
raise HomeAssistantError(
translation_domain=DOMAIN, translation_key="failed_to_set_clsp"
)
self._attr_target_temperature_high = temp
if kwargs.get("target_temp_low"):
temp = int(kwargs["target_temp_low"])
if value := kwargs.get(ATTR_TARGET_TEMP_LOW):
temp = int(value)
if not await self._client.set_heating_setpoint(temp):
raise HomeAssistantError(
translation_domain=DOMAIN, translation_key="failed_to_set_htsp"
)
self._attr_target_temperature_low = temp
if kwargs.get("temperature"):
temp = int(kwargs["temperature"])
if value := kwargs.get(ATTR_TEMPERATURE):
temp = int(value)
fn = (
self._client.set_heating_setpoint
if self.hvac_mode == HVACMode.HEAT

View File

@@ -169,7 +169,7 @@ class CalendarEventListener:
def __init__(
self,
hass: HomeAssistant,
job: HassJob[..., Coroutine[Any, Any, None]],
job: HassJob[..., Coroutine[Any, Any, None] | Any],
trigger_data: dict[str, Any],
fetcher: QueuedEventFetcher,
) -> None:

View File

@@ -514,7 +514,7 @@ class ChatLog:
"""Set the LLM system prompt."""
llm_api: llm.APIInstance | None = None
if user_llm_hass_api is None:
if not user_llm_hass_api:
pass
elif isinstance(user_llm_hass_api, llm.API):
llm_api = await user_llm_hass_api.async_get_api_instance(llm_context)

View File

@@ -29,7 +29,12 @@ from homeassistant.components.climate import (
ClimateEntityFeature,
HVACMode,
)
from homeassistant.const import ATTR_MODE, PRECISION_TENTHS, UnitOfTemperature
from homeassistant.const import (
ATTR_MODE,
ATTR_TEMPERATURE,
PRECISION_TENTHS,
UnitOfTemperature,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_platform import AddEntitiesCallback
@@ -243,7 +248,7 @@ class EvoZone(EvoChild, EvoClimateEntity):
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set a new target temperature."""
temperature = kwargs["temperature"]
temperature = kwargs[ATTR_TEMPERATURE]
if (until := kwargs.get("until")) is None:
if self._evo_device.mode == EvoZoneMode.TEMPORARY_OVERRIDE:

View File

@@ -72,21 +72,15 @@ _TIME_TRIGGER_SCHEMA = vol.Any(
),
)
_WEEKDAY_SCHEMA = vol.Any(
vol.In(WEEKDAYS),
vol.All(cv.ensure_list, [vol.In(WEEKDAYS)]),
cv.entity_domain(["input_weekday"]),
msg=(
"Expected a weekday (mon, tue, wed, thu, fri, sat, sun), "
"a list of weekdays, or an Entity ID with domain 'input_weekday'"
),
)
TRIGGER_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend(
{
vol.Required(CONF_PLATFORM): "time",
vol.Required(CONF_AT): vol.All(cv.ensure_list, [_TIME_TRIGGER_SCHEMA]),
vol.Optional(CONF_WEEKDAY): _WEEKDAY_SCHEMA,
vol.Optional(CONF_WEEKDAY): vol.Any(
vol.In(WEEKDAYS),
vol.All(cv.ensure_list, [vol.In(WEEKDAYS)]),
),
}
)
@@ -123,14 +117,7 @@ async def async_attach_trigger( # noqa: C901
# Check if current weekday matches the configuration
if isinstance(weekday_config, str):
# Could be a single weekday string or an entity_id
if weekday_config.startswith("input_weekday."):
if (weekday_state := hass.states.get(weekday_config)) is None:
return
entity_weekdays = weekday_state.attributes.get("weekdays", [])
if current_weekday not in entity_weekdays:
return
elif current_weekday != weekday_config:
if current_weekday != weekday_config:
return
elif current_weekday not in weekday_config:
return

View File

@@ -5,6 +5,6 @@
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/imgw_pib",
"iot_class": "cloud_polling",
"quality_scale": "silver",
"quality_scale": "platinum",
"requirements": ["imgw_pib==1.5.6"]
}

View File

@@ -50,17 +50,17 @@ rules:
discovery:
status: exempt
comment: The integration is a cloud service and thus does not support discovery.
docs-data-update: todo
docs-examples: todo
docs-known-limitations: todo
docs-data-update: done
docs-examples: done
docs-known-limitations: done
docs-supported-devices:
status: exempt
comment: This is a service, which doesn't integrate with any devices.
docs-supported-functions: todo
docs-supported-functions: done
docs-troubleshooting:
status: exempt
comment: No known issues that could be resolved by the user.
docs-use-cases: todo
docs-use-cases: done
dynamic-devices:
status: exempt
comment: This integration has a fixed single service.

View File

@@ -1,285 +0,0 @@
"""Support to select weekdays for use in automation."""
from __future__ import annotations
import logging
from typing import Any, Self
import voluptuous as vol
from homeassistant.const import (
ATTR_EDITABLE,
CONF_ICON,
CONF_ID,
CONF_NAME,
SERVICE_RELOAD,
WEEKDAYS,
)
from homeassistant.core import HomeAssistant, ServiceCall, callback
from homeassistant.helpers import collection, config_validation as cv
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.helpers.restore_state import RestoreEntity
import homeassistant.helpers.service
from homeassistant.helpers.storage import Store
from homeassistant.helpers.typing import ConfigType, VolDictType
_LOGGER = logging.getLogger(__name__)
DOMAIN = "input_weekday"
CONF_WEEKDAYS = "weekdays"
ATTR_WEEKDAYS = "weekdays"
ATTR_WEEKDAY = "weekday"
SERVICE_SET_WEEKDAYS = "set_weekdays"
SERVICE_ADD_WEEKDAY = "add_weekday"
SERVICE_REMOVE_WEEKDAY = "remove_weekday"
SERVICE_TOGGLE_WEEKDAY = "toggle_weekday"
SERVICE_CLEAR = "clear"
STORAGE_KEY = DOMAIN
STORAGE_VERSION = 1
STORAGE_FIELDS: VolDictType = {
vol.Required(CONF_NAME): vol.All(str, vol.Length(min=1)),
vol.Optional(CONF_WEEKDAYS, default=list): vol.All(
cv.ensure_list, [vol.In(WEEKDAYS)]
),
vol.Optional(CONF_ICON): cv.icon,
}
def _cv_input_weekday(cfg: dict[str, Any]) -> dict[str, Any]:
"""Configure validation helper for input weekday (voluptuous)."""
if CONF_WEEKDAYS in cfg:
weekdays = cfg[CONF_WEEKDAYS]
# Remove duplicates while preserving order
cfg[CONF_WEEKDAYS] = list(dict.fromkeys(weekdays))
return cfg
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: cv.schema_with_slug_keys(
vol.All(
{
vol.Optional(CONF_NAME): cv.string,
vol.Optional(CONF_WEEKDAYS): vol.All(
cv.ensure_list, [vol.In(WEEKDAYS)]
),
vol.Optional(CONF_ICON): cv.icon,
},
_cv_input_weekday,
)
)
},
extra=vol.ALLOW_EXTRA,
)
RELOAD_SERVICE_SCHEMA = vol.Schema({})
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up an input weekday."""
component = EntityComponent[InputWeekday](_LOGGER, DOMAIN, hass)
id_manager = collection.IDManager()
yaml_collection = collection.YamlCollection(
logging.getLogger(f"{__name__}.yaml_collection"), id_manager
)
collection.sync_entity_lifecycle(
hass, DOMAIN, DOMAIN, component, yaml_collection, InputWeekday
)
storage_collection = InputWeekdayStorageCollection(
Store(hass, STORAGE_VERSION, STORAGE_KEY),
id_manager,
)
collection.sync_entity_lifecycle(
hass, DOMAIN, DOMAIN, component, storage_collection, InputWeekday
)
await yaml_collection.async_load(
[{CONF_ID: id_, **cfg} for id_, cfg in config.get(DOMAIN, {}).items()]
)
await storage_collection.async_load()
collection.DictStorageCollectionWebsocket(
storage_collection, DOMAIN, DOMAIN, STORAGE_FIELDS, STORAGE_FIELDS
).async_setup(hass)
async def reload_service_handler(service_call: ServiceCall) -> None:
"""Reload yaml entities."""
conf = await component.async_prepare_reload(skip_reset=True)
if conf is None:
conf = {DOMAIN: {}}
await yaml_collection.async_load(
[{CONF_ID: id_, **cfg} for id_, cfg in conf.get(DOMAIN, {}).items()]
)
homeassistant.helpers.service.async_register_admin_service(
hass,
DOMAIN,
SERVICE_RELOAD,
reload_service_handler,
schema=RELOAD_SERVICE_SCHEMA,
)
component.async_register_entity_service(
SERVICE_SET_WEEKDAYS,
{vol.Required(ATTR_WEEKDAYS): vol.All(cv.ensure_list, [vol.In(WEEKDAYS)])},
"async_set_weekdays",
)
component.async_register_entity_service(
SERVICE_ADD_WEEKDAY,
{vol.Required(ATTR_WEEKDAY): vol.In(WEEKDAYS)},
"async_add_weekday",
)
component.async_register_entity_service(
SERVICE_REMOVE_WEEKDAY,
{vol.Required(ATTR_WEEKDAY): vol.In(WEEKDAYS)},
"async_remove_weekday",
)
component.async_register_entity_service(
SERVICE_TOGGLE_WEEKDAY,
{vol.Required(ATTR_WEEKDAY): vol.In(WEEKDAYS)},
"async_toggle_weekday",
)
component.async_register_entity_service(
SERVICE_CLEAR,
None,
"async_clear",
)
return True
class InputWeekdayStorageCollection(collection.DictStorageCollection):
"""Input weekday storage based collection."""
CREATE_UPDATE_SCHEMA = vol.Schema(vol.All(STORAGE_FIELDS, _cv_input_weekday))
async def _process_create_data(self, data: dict[str, Any]) -> dict[str, Any]:
"""Validate the config is valid."""
return self.CREATE_UPDATE_SCHEMA(data)
@callback
def _get_suggested_id(self, info: dict[str, Any]) -> str:
"""Suggest an ID based on the config."""
return info[CONF_NAME]
async def _update_data(
self, item: dict[str, Any], update_data: dict[str, Any]
) -> dict[str, Any]:
"""Return a new updated data object."""
update_data = self.CREATE_UPDATE_SCHEMA(update_data)
return item | update_data
# pylint: disable-next=hass-enforce-class-module
class InputWeekday(collection.CollectionEntity, RestoreEntity):
"""Representation of a weekday input."""
_unrecorded_attributes = frozenset({ATTR_EDITABLE})
_attr_should_poll = False
editable: bool
def __init__(self, config: ConfigType) -> None:
"""Initialize a weekday input."""
self._config = config
self._attr_weekdays = config.get(CONF_WEEKDAYS, [])
self._attr_unique_id = config[CONF_ID]
@classmethod
def from_storage(cls, config: ConfigType) -> Self:
"""Return entity instance initialized from storage."""
input_weekday = cls(config)
input_weekday.editable = True
return input_weekday
@classmethod
def from_yaml(cls, config: ConfigType) -> Self:
"""Return entity instance initialized from yaml."""
input_weekday = cls(config)
input_weekday.entity_id = f"{DOMAIN}.{config[CONF_ID]}"
input_weekday.editable = False
return input_weekday
@property
def name(self) -> str:
"""Return name of the weekday input."""
return self._config.get(CONF_NAME) or self._config[CONF_ID]
@property
def icon(self) -> str | None:
"""Return the icon to be used for this entity."""
return self._config.get(CONF_ICON)
@property
def state(self) -> str:
"""Return the state of the entity."""
# Return a comma-separated string of selected weekdays
return ",".join(self._attr_weekdays) if self._attr_weekdays else ""
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes of the entity."""
return {
ATTR_WEEKDAYS: self._attr_weekdays,
ATTR_EDITABLE: self.editable,
}
async def async_added_to_hass(self) -> None:
"""Call when entity about to be added to hass."""
await super().async_added_to_hass()
# Restore previous state if no initial weekdays were provided
if self._config.get(CONF_WEEKDAYS) is not None:
return
state = await self.async_get_last_state()
if state is not None and ATTR_WEEKDAYS in state.attributes:
self._attr_weekdays = state.attributes[ATTR_WEEKDAYS]
async def async_set_weekdays(self, weekdays: list[str]) -> None:
"""Set the selected weekdays."""
# Remove duplicates while preserving order
self._attr_weekdays = list(dict.fromkeys(weekdays))
self.async_write_ha_state()
async def async_add_weekday(self, weekday: str) -> None:
"""Add a weekday to the selection."""
if weekday not in self._attr_weekdays:
self._attr_weekdays.append(weekday)
self.async_write_ha_state()
async def async_remove_weekday(self, weekday: str) -> None:
"""Remove a weekday from the selection."""
if weekday in self._attr_weekdays:
self._attr_weekdays.remove(weekday)
self.async_write_ha_state()
async def async_toggle_weekday(self, weekday: str) -> None:
"""Toggle a weekday in the selection."""
if weekday in self._attr_weekdays:
self._attr_weekdays.remove(weekday)
else:
self._attr_weekdays.append(weekday)
self.async_write_ha_state()
async def async_clear(self) -> None:
"""Clear all selected weekdays."""
self._attr_weekdays = []
self.async_write_ha_state()
async def async_update_config(self, config: ConfigType) -> None:
"""Handle when the config is updated."""
self._config = config
self._attr_weekdays = config.get(CONF_WEEKDAYS, [])
self.async_write_ha_state()

View File

@@ -1,29 +0,0 @@
{
"entity": {
"input_weekday": {
"default": {
"default": "mdi:calendar-week"
}
}
},
"services": {
"set_weekdays": {
"service": "mdi:calendar-edit"
},
"add_weekday": {
"service": "mdi:calendar-plus"
},
"remove_weekday": {
"service": "mdi:calendar-minus"
},
"toggle_weekday": {
"service": "mdi:calendar-check"
},
"clear": {
"service": "mdi:calendar-remove"
},
"reload": {
"service": "mdi:reload"
}
}
}

View File

@@ -1,8 +0,0 @@
{
"domain": "input_weekday",
"name": "Input Weekday",
"codeowners": ["@home-assistant/core"],
"documentation": "https://www.home-assistant.io/integrations/input_weekday",
"integration_type": "helper",
"quality_scale": "internal"
}

View File

@@ -1,42 +0,0 @@
"""Reproduce an Input Weekday state."""
from __future__ import annotations
import logging
from typing import Any
from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.core import Context, HomeAssistant, State
from . import ATTR_WEEKDAYS, DOMAIN, SERVICE_SET_WEEKDAYS
_LOGGER = logging.getLogger(__name__)
async def async_reproduce_states(
hass: HomeAssistant,
states: list[State],
*,
context: Context | None = None,
reproduce_options: dict[str, Any] | None = None,
) -> None:
"""Reproduce Input Weekday states."""
for state in states:
if ATTR_WEEKDAYS not in state.attributes:
_LOGGER.warning(
"Unable to reproduce state for %s: %s attribute is missing",
state.entity_id,
ATTR_WEEKDAYS,
)
continue
weekdays = state.attributes[ATTR_WEEKDAYS]
service_data = {
ATTR_ENTITY_ID: state.entity_id,
ATTR_WEEKDAYS: weekdays,
}
await hass.services.async_call(
DOMAIN, SERVICE_SET_WEEKDAYS, service_data, context=context, blocking=True
)

View File

@@ -1,115 +0,0 @@
set_weekdays:
target:
entity:
domain: input_weekday
fields:
weekdays:
required: true
example: '["mon", "wed", "fri"]'
selector:
select:
multiple: true
mode: list
options:
- value: mon
label: Monday
- value: tue
label: Tuesday
- value: wed
label: Wednesday
- value: thu
label: Thursday
- value: fri
label: Friday
- value: sat
label: Saturday
- value: sun
label: Sunday
add_weekday:
target:
entity:
domain: input_weekday
fields:
weekday:
required: true
example: mon
selector:
select:
mode: dropdown
options:
- value: mon
label: Monday
- value: tue
label: Tuesday
- value: wed
label: Wednesday
- value: thu
label: Thursday
- value: fri
label: Friday
- value: sat
label: Saturday
- value: sun
label: Sunday
remove_weekday:
target:
entity:
domain: input_weekday
fields:
weekday:
required: true
example: mon
selector:
select:
mode: dropdown
options:
- value: mon
label: Monday
- value: tue
label: Tuesday
- value: wed
label: Wednesday
- value: thu
label: Thursday
- value: fri
label: Friday
- value: sat
label: Saturday
- value: sun
label: Sunday
toggle_weekday:
target:
entity:
domain: input_weekday
fields:
weekday:
required: true
example: mon
selector:
select:
mode: dropdown
options:
- value: mon
label: Monday
- value: tue
label: Tuesday
- value: wed
label: Wednesday
- value: thu
label: Thursday
- value: fri
label: Friday
- value: sat
label: Saturday
- value: sun
label: Sunday
clear:
target:
entity:
domain: input_weekday
reload:

View File

@@ -1,70 +0,0 @@
{
"title": "Input Weekday",
"entity_component": {
"_": {
"name": "[%key:component::input_weekday::title%]",
"state_attributes": {
"weekdays": {
"name": "Weekdays"
},
"editable": {
"name": "[%key:common::generic::ui_managed%]",
"state": {
"true": "[%key:common::state::yes%]",
"false": "[%key:common::state::no%]"
}
}
}
}
},
"services": {
"set_weekdays": {
"name": "Set weekdays",
"description": "Sets the selected weekdays.",
"fields": {
"weekdays": {
"name": "Weekdays",
"description": "List of weekdays to select."
}
}
},
"add_weekday": {
"name": "Add weekday",
"description": "Adds a weekday to the selection.",
"fields": {
"weekday": {
"name": "Weekday",
"description": "Weekday to add."
}
}
},
"remove_weekday": {
"name": "Remove weekday",
"description": "Removes a weekday from the selection.",
"fields": {
"weekday": {
"name": "Weekday",
"description": "Weekday to remove."
}
}
},
"toggle_weekday": {
"name": "Toggle weekday",
"description": "Toggles a weekday in the selection.",
"fields": {
"weekday": {
"name": "Weekday",
"description": "Weekday to toggle."
}
}
},
"clear": {
"name": "Clear",
"description": "Clears all selected weekdays."
},
"reload": {
"name": "[%key:common::action::reload%]",
"description": "Reloads helpers from the YAML-configuration."
}
}
}

View File

@@ -1 +1,36 @@
"""The london_underground component."""
from __future__ import annotations
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN as DOMAIN
from .coordinator import LondonTubeCoordinator, LondonUndergroundConfigEntry, TubeData
PLATFORMS: list[Platform] = [Platform.SENSOR]
async def async_setup_entry(
hass: HomeAssistant, entry: LondonUndergroundConfigEntry
) -> bool:
"""Set up London Underground from a config entry."""
session = async_get_clientsession(hass)
data = TubeData(session)
coordinator = LondonTubeCoordinator(hass, data, config_entry=entry)
await coordinator.async_config_entry_first_refresh()
entry.runtime_data = coordinator
# Forward the setup to the sensor platform
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(
hass: HomeAssistant, entry: LondonUndergroundConfigEntry
) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -0,0 +1,152 @@
"""Config flow for London Underground integration."""
from __future__ import annotations
import asyncio
import logging
from typing import Any
from london_tube_status import TubeData
import voluptuous as vol
from homeassistant.config_entries import (
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
OptionsFlowWithReload,
)
from homeassistant.core import callback
from homeassistant.helpers import selector
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.typing import ConfigType
from .const import CONF_LINE, DEFAULT_LINES, DOMAIN, TUBE_LINES
_LOGGER = logging.getLogger(__name__)
class LondonUndergroundConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for London Underground."""
VERSION = 1
MINOR_VERSION = 1
@staticmethod
@callback
def async_get_options_flow(
_: ConfigEntry,
) -> LondonUndergroundOptionsFlow:
"""Get the options flow for this handler."""
return LondonUndergroundOptionsFlow()
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
errors: dict[str, str] = {}
if user_input is not None:
session = async_get_clientsession(self.hass)
data = TubeData(session)
try:
async with asyncio.timeout(10):
await data.update()
except TimeoutError:
errors["base"] = "timeout_connect"
except Exception:
_LOGGER.exception("Unexpected error")
errors["base"] = "cannot_connect"
else:
return self.async_create_entry(
title="London Underground",
data={},
options={CONF_LINE: user_input.get(CONF_LINE, DEFAULT_LINES)},
)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Optional(
CONF_LINE,
default=DEFAULT_LINES,
): selector.SelectSelector(
selector.SelectSelectorConfig(
options=TUBE_LINES,
multiple=True,
mode=selector.SelectSelectorMode.DROPDOWN,
)
),
}
),
errors=errors,
)
async def async_step_import(self, import_data: ConfigType) -> ConfigFlowResult:
"""Handle import from configuration.yaml."""
session = async_get_clientsession(self.hass)
data = TubeData(session)
try:
async with asyncio.timeout(10):
await data.update()
except Exception:
_LOGGER.exception(
"Unexpected error trying to connect before importing config, aborting import "
)
return self.async_abort(reason="cannot_connect")
_LOGGER.warning(
"Importing London Underground config from configuration.yaml: %s",
import_data,
)
# Extract lines from the sensor platform config
lines = import_data.get(CONF_LINE, DEFAULT_LINES)
if "London Overground" in lines:
_LOGGER.warning(
"London Overground was removed from the configuration as the line has been divided and renamed"
)
lines.remove("London Overground")
return self.async_create_entry(
title="London Underground",
data={},
options={CONF_LINE: import_data.get(CONF_LINE, DEFAULT_LINES)},
)
class LondonUndergroundOptionsFlow(OptionsFlowWithReload):
"""Handle options."""
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Manage the options."""
if user_input is not None:
_LOGGER.debug(
"Updating london underground with options flow user_input: %s",
user_input,
)
return self.async_create_entry(
title="",
data={CONF_LINE: user_input[CONF_LINE]},
)
return self.async_show_form(
step_id="init",
data_schema=vol.Schema(
{
vol.Optional(
CONF_LINE,
default=self.config_entry.options.get(
CONF_LINE,
self.config_entry.data.get(CONF_LINE, DEFAULT_LINES),
),
): selector.SelectSelector(
selector.SelectSelectorConfig(
options=TUBE_LINES,
multiple=True,
mode=selector.SelectSelectorMode.DROPDOWN,
)
),
}
),
)

View File

@@ -6,7 +6,6 @@ DOMAIN = "london_underground"
CONF_LINE = "line"
SCAN_INTERVAL = timedelta(seconds=30)
TUBE_LINES = [
@@ -18,7 +17,7 @@ TUBE_LINES = [
"Elizabeth line",
"Hammersmith & City",
"Jubilee",
"London Overground",
"London Overground", # no longer supported
"Metropolitan",
"Northern",
"Piccadilly",
@@ -31,3 +30,20 @@ TUBE_LINES = [
"Weaver",
"Windrush",
]
# Default lines to monitor if none selected
DEFAULT_LINES = [
"Bakerloo",
"Central",
"Circle",
"District",
"DLR",
"Elizabeth line",
"Hammersmith & City",
"Jubilee",
"Metropolitan",
"Northern",
"Piccadilly",
"Victoria",
"Waterloo & City",
]

View File

@@ -8,6 +8,7 @@ from typing import cast
from london_tube_status import TubeData
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
@@ -15,16 +16,23 @@ from .const import DOMAIN, SCAN_INTERVAL
_LOGGER = logging.getLogger(__name__)
type LondonUndergroundConfigEntry = ConfigEntry[LondonTubeCoordinator]
class LondonTubeCoordinator(DataUpdateCoordinator[dict[str, dict[str, str]]]):
"""London Underground sensor coordinator."""
def __init__(self, hass: HomeAssistant, data: TubeData) -> None:
def __init__(
self,
hass: HomeAssistant,
data: TubeData,
config_entry: LondonUndergroundConfigEntry,
) -> None:
"""Initialize coordinator."""
super().__init__(
hass,
_LOGGER,
config_entry=None,
config_entry=config_entry,
name=DOMAIN,
update_interval=SCAN_INTERVAL,
)

View File

@@ -2,9 +2,12 @@
"domain": "london_underground",
"name": "London Underground",
"codeowners": ["@jpbede"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/london_underground",
"integration_type": "service",
"iot_class": "cloud_polling",
"loggers": ["london_tube_status"],
"quality_scale": "legacy",
"requirements": ["london-tube-status==0.5"]
"requirements": ["london-tube-status==0.5"],
"single_config_entry": true
}

View File

@@ -5,23 +5,26 @@ from __future__ import annotations
import logging
from typing import Any
from london_tube_status import TubeData
import voluptuous as vol
from homeassistant.components.sensor import (
PLATFORM_SCHEMA as SENSOR_PLATFORM_SCHEMA,
SensorEntity,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers import config_validation as cv, issue_registry as ir
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import (
AddConfigEntryEntitiesCallback,
AddEntitiesCallback,
)
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import CONF_LINE, TUBE_LINES
from .coordinator import LondonTubeCoordinator
from .const import CONF_LINE, DOMAIN, TUBE_LINES
from .coordinator import LondonTubeCoordinator, LondonUndergroundConfigEntry
_LOGGER = logging.getLogger(__name__)
@@ -38,18 +41,54 @@ async def async_setup_platform(
) -> None:
"""Set up the Tube sensor."""
session = async_get_clientsession(hass)
# If configuration.yaml config exists, trigger the import flow.
# If the config entry already exists, this will not be triggered as only one config is allowed.
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=config
)
if (
result.get("type") is FlowResultType.ABORT
and result.get("reason") != "already_configured"
):
ir.async_create_issue(
hass,
DOMAIN,
f"deprecated_yaml_import_issue_{result.get('reason')}",
is_fixable=False,
issue_domain=DOMAIN,
severity=ir.IssueSeverity.WARNING,
translation_key="deprecated_yaml_import_issue",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "London Underground",
},
)
return
data = TubeData(session)
coordinator = LondonTubeCoordinator(hass, data)
ir.async_create_issue(
hass,
HOMEASSISTANT_DOMAIN,
"deprecated_yaml",
is_fixable=False,
issue_domain=DOMAIN,
severity=ir.IssueSeverity.WARNING,
translation_key="deprecated_yaml",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "London Underground",
},
)
await coordinator.async_refresh()
if not coordinator.last_update_success:
raise PlatformNotReady
async def async_setup_entry(
hass: HomeAssistant,
entry: LondonUndergroundConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the London Underground sensor from config entry."""
async_add_entities(
LondonTubeSensor(coordinator, line) for line in config[CONF_LINE]
LondonTubeSensor(entry.runtime_data, line) for line in entry.options[CONF_LINE]
)
@@ -58,11 +97,21 @@ class LondonTubeSensor(CoordinatorEntity[LondonTubeCoordinator], SensorEntity):
_attr_attribution = "Powered by TfL Open Data"
_attr_icon = "mdi:subway"
_attr_has_entity_name = True # Use modern entity naming
def __init__(self, coordinator: LondonTubeCoordinator, name: str) -> None:
"""Initialize the London Underground sensor."""
super().__init__(coordinator)
self._name = name
# Add unique_id for proper entity registry
self._attr_unique_id = f"tube_{name.lower().replace(' ', '_')}"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, DOMAIN)},
name="London Underground",
manufacturer="Transport for London",
model="Tube Status",
entry_type=DeviceEntryType.SERVICE,
)
@property
def name(self) -> str:

View File

@@ -0,0 +1,38 @@
{
"config": {
"step": {
"user": {
"title": "Set up London Underground",
"description": "Select which tube lines you want to monitor",
"data": {
"line": "Tube lines"
}
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"timeout_connect": "[%key:common::config_flow::error::timeout_connect%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"single_instance_allowed": "[%key:common::config_flow::abort::single_instance_allowed%]"
}
},
"options": {
"step": {
"init": {
"title": "Configure London Underground",
"description": "[%key:component::london_underground::config::step::user::description%]",
"data": {
"line": "[%key:component::london_underground::config::step::user::data::line%]"
}
}
}
},
"issues": {
"deprecated_yaml_import_issue": {
"title": "London Underground YAML configuration deprecated",
"description": "Configuring London Underground using YAML sensor platform is deprecated.\n\nWhile importing your configuration, an error occurred when trying to connect to the Transport for London API. Please restart Home Assistant to try again, or remove the existing YAML configuration and set the integration up via the UI."
}
}
}

View File

@@ -408,5 +408,5 @@ class AtwDeviceZoneClimate(MelCloudClimate):
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set new target temperature."""
await self._zone.set_target_temperature(
kwargs.get("temperature", self.target_temperature)
kwargs.get(ATTR_TEMPERATURE, self.target_temperature)
)

View File

@@ -50,8 +50,14 @@
"valve_status": {
"default": "mdi:valve"
},
"vial_name": {
"default": "mdi:scent"
},
"illuminance_level": {
"default": "mdi:brightness-5"
},
"vial_level": {
"default": "mdi:bottle-tonic-outline"
}
},
"switch": {
@@ -61,6 +67,13 @@
"off": "mdi:valve-closed",
"on": "mdi:valve-open"
}
},
"cury_slot": {
"default": "mdi:scent",
"state": {
"off": "mdi:scent-off",
"on": "mdi:scent"
}
}
}
}

View File

@@ -72,6 +72,7 @@ class RpcNumberDescription(RpcEntityDescription, NumberEntityDescription):
min_fn: Callable[[dict], float] | None = None
step_fn: Callable[[dict], float] | None = None
mode_fn: Callable[[dict], NumberMode] | None = None
slot: str | None = None
method: str
@@ -121,6 +122,22 @@ class RpcNumber(ShellyRpcAttributeEntity, NumberEntity):
await method(self._id, value)
class RpcCuryIntensityNumber(RpcNumber):
"""Represent a RPC Cury Intensity entity."""
@rpc_call
async def async_set_native_value(self, value: float) -> None:
"""Change the value."""
method = getattr(self.coordinator.device, self.entity_description.method)
if TYPE_CHECKING:
assert method is not None
await method(
self._id, slot=self.entity_description.slot, intensity=round(value)
)
class RpcBluTrvNumber(RpcNumber):
"""Represent a RPC BluTrv number."""
@@ -274,6 +291,38 @@ RPC_NUMBERS: Final = {
is True,
entity_class=RpcBluTrvNumber,
),
"left_slot_intensity": RpcNumberDescription(
key="cury",
sub_key="slots",
name="Left slot intensity",
value=lambda status, _: status["left"]["intensity"],
native_min_value=0,
native_max_value=100,
native_step=1,
mode=NumberMode.SLIDER,
native_unit_of_measurement=PERCENTAGE,
method="cury_set",
slot="left",
available=lambda status: (left := status["left"]) is not None
and left.get("vial", {}).get("level", -1) != -1,
entity_class=RpcCuryIntensityNumber,
),
"right_slot_intensity": RpcNumberDescription(
key="cury",
sub_key="slots",
name="Right slot intensity",
value=lambda status, _: status["right"]["intensity"],
native_min_value=0,
native_max_value=100,
native_step=1,
mode=NumberMode.SLIDER,
native_unit_of_measurement=PERCENTAGE,
method="cury_set",
slot="right",
available=lambda status: (right := status["right"]) is not None
and right.get("vial", {}).get("level", -1) != -1,
entity_class=RpcCuryIntensityNumber,
),
}

View File

@@ -1658,6 +1658,50 @@ RPC_SENSORS: Final = {
state_class=SensorStateClass.MEASUREMENT,
role="phase_info",
),
"cury_left_level": RpcSensorDescription(
key="cury",
sub_key="slots",
name="Left slot level",
translation_key="vial_level",
value=lambda status, _: status["left"]["vial"]["level"],
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
entity_category=EntityCategory.DIAGNOSTIC,
available=lambda status: (left := status["left"]) is not None
and left.get("vial", {}).get("level", -1) != -1,
),
"cury_left_vial": RpcSensorDescription(
key="cury",
sub_key="slots",
name="Left slot vial",
translation_key="vial_name",
value=lambda status, _: status["left"]["vial"]["name"],
entity_category=EntityCategory.DIAGNOSTIC,
available=lambda status: (left := status["left"]) is not None
and left.get("vial", {}).get("level", -1) != -1,
),
"cury_right_level": RpcSensorDescription(
key="cury",
sub_key="slots",
name="Right slot level",
translation_key="vial_level",
value=lambda status, _: status["right"]["vial"]["level"],
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
entity_category=EntityCategory.DIAGNOSTIC,
available=lambda status: (right := status["right"]) is not None
and right.get("vial", {}).get("level", -1) != -1,
),
"cury_right_vial": RpcSensorDescription(
key="cury",
sub_key="slots",
name="Right slot vial",
translation_key="vial_name",
value=lambda status, _: status["right"]["vial"]["name"],
entity_category=EntityCategory.DIAGNOSTIC,
available=lambda status: (right := status["right"]) is not None
and right.get("vial", {}).get("level", -1) != -1,
),
}

View File

@@ -230,6 +230,32 @@ RPC_SWITCHES = {
entity_registry_enabled_default=False,
entity_category=EntityCategory.CONFIG,
),
"cury_left": RpcSwitchDescription(
key="cury",
sub_key="slots",
name="Left slot",
translation_key="cury_slot",
is_on=lambda status: bool(status["slots"]["left"]["on"]),
method_on="cury_set",
method_off="cury_set",
method_params_fn=lambda id, value: (id, "left", value),
entity_registry_enabled_default=True,
available=lambda status: (left := status["left"]) is not None
and left.get("vial", {}).get("level", -1) != -1,
),
"cury_right": RpcSwitchDescription(
key="cury",
sub_key="slots",
name="Right slot",
translation_key="cury_slot",
is_on=lambda status: bool(status["slots"]["right"]["on"]),
method_on="cury_set",
method_off="cury_set",
method_params_fn=lambda id, value: (id, "right", value),
entity_registry_enabled_default=True,
available=lambda status: (right := status["right"]) is not None
and right.get("vial", {}).get("level", -1) != -1,
),
}

View File

@@ -100,8 +100,9 @@ ATTR_PIN_VALUE = "pin"
ATTR_TIMESTAMP = "timestamp"
DEFAULT_SCAN_INTERVAL = timedelta(seconds=30)
DEFAULT_SOCKET_MIN_RETRY = 15
WEBSOCKET_RECONNECT_RETRIES = 3
WEBSOCKET_RETRY_DELAY = 2
EVENT_SIMPLISAFE_EVENT = "SIMPLISAFE_EVENT"
EVENT_SIMPLISAFE_NOTIFICATION = "SIMPLISAFE_NOTIFICATION"
@@ -419,6 +420,7 @@ class SimpliSafe:
self._api = api
self._hass = hass
self._system_notifications: dict[int, set[SystemNotification]] = {}
self._websocket_reconnect_retries: int = 0
self._websocket_reconnect_task: asyncio.Task | None = None
self.entry = entry
self.initial_event_to_use: dict[int, dict[str, Any]] = {}
@@ -469,6 +471,8 @@ class SimpliSafe:
"""Start a websocket reconnection loop."""
assert self._api.websocket
self._websocket_reconnect_retries += 1
try:
await self._api.websocket.async_connect()
await self._api.websocket.async_listen()
@@ -479,9 +483,21 @@ class SimpliSafe:
LOGGER.error("Failed to connect to websocket: %s", err)
except Exception as err: # noqa: BLE001
LOGGER.error("Unknown exception while connecting to websocket: %s", err)
else:
self._websocket_reconnect_retries = 0
LOGGER.debug("Reconnecting to websocket")
await self._async_cancel_websocket_loop()
if self._websocket_reconnect_retries >= WEBSOCKET_RECONNECT_RETRIES:
LOGGER.error("Max websocket connection retries exceeded")
return
delay = WEBSOCKET_RETRY_DELAY * (2 ** (self._websocket_reconnect_retries - 1))
LOGGER.info(
"Retrying websocket connection in %s seconds (attempt %s/%s)",
delay,
self._websocket_reconnect_retries,
WEBSOCKET_RECONNECT_RETRIES,
)
await asyncio.sleep(delay)
self._websocket_reconnect_task = self._hass.async_create_task(
self._async_start_websocket_loop()
)

View File

@@ -18,7 +18,7 @@ from homeassistant.components.climate import (
ClimateEntityFeature,
HVACMode,
)
from homeassistant.const import UnitOfTemperature
from homeassistant.const import ATTR_TEMPERATURE, UnitOfTemperature
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
@@ -352,7 +352,7 @@ class TuyaClimateEntity(TuyaEntity, ClimateEntity):
{
"code": self._set_temperature.dpcode,
"value": round(
self._set_temperature.scale_value_back(kwargs["temperature"])
self._set_temperature.scale_value_back(kwargs[ATTR_TEMPERATURE])
),
}
]

View File

@@ -0,0 +1,45 @@
"""Volvo diagnostics."""
from dataclasses import asdict
from typing import Any
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_API_KEY
from homeassistant.core import HomeAssistant
from homeassistant.helpers.redact import async_redact_data
from .const import CONF_VIN
from .coordinator import VolvoConfigEntry
_TO_REDACT_ENTRY = [
CONF_ACCESS_TOKEN,
CONF_API_KEY,
CONF_VIN,
"id_token",
"refresh_token",
]
_TO_REDACT_DATA = [
"coordinates",
"heading",
"vin",
]
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, entry: VolvoConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
context = entry.runtime_data.interval_coordinators[0].context
data: dict[str, dict] = {}
for coordinator in entry.runtime_data.interval_coordinators:
data[coordinator.name] = {
key: async_redact_data(asdict(value), _TO_REDACT_DATA) if value else None
for key, value in coordinator.data.items()
}
return {
"entry_data": async_redact_data(entry.data, _TO_REDACT_ENTRY),
"vehicle": async_redact_data(asdict(context.vehicle), _TO_REDACT_DATA),
**data,
}

View File

@@ -367,6 +367,7 @@ FLOWS = {
"local_ip",
"local_todo",
"locative",
"london_underground",
"lookin",
"loqed",
"luftdaten",

View File

@@ -3688,9 +3688,10 @@
},
"london_underground": {
"name": "London Underground",
"integration_type": "hub",
"config_flow": false,
"iot_class": "cloud_polling"
"integration_type": "service",
"config_flow": true,
"iot_class": "cloud_polling",
"single_config_entry": true
},
"lookin": {
"name": "LOOKin",
@@ -7922,10 +7923,6 @@
"integration_type": "helper",
"config_flow": false
},
"input_weekday": {
"integration_type": "helper",
"config_flow": false
},
"integration": {
"integration_type": "helper",
"config_flow": true,
@@ -8025,7 +8022,6 @@
"input_number",
"input_select",
"input_text",
"input_weekday",
"integration",
"irm_kmi",
"islamic_prayer_times",

View File

@@ -954,25 +954,11 @@ def time(
if weekday is not None:
now_weekday = WEEKDAYS[now.weekday()]
# Check if weekday is an entity_id
if isinstance(weekday, str) and weekday.startswith("input_weekday."):
if (weekday_state := hass.states.get(weekday)) is None:
condition_trace_update_result(weekday=weekday, now_weekday=now_weekday)
return False
entity_weekdays = weekday_state.attributes.get("weekdays", [])
condition_trace_update_result(
weekday=weekday,
now_weekday=now_weekday,
entity_weekdays=entity_weekdays,
)
if now_weekday not in entity_weekdays:
return False
else:
condition_trace_update_result(weekday=weekday, now_weekday=now_weekday)
if (
isinstance(weekday, str) and weekday != now_weekday
) or now_weekday not in weekday:
return False
condition_trace_update_result(weekday=weekday, now_weekday=now_weekday)
if (
isinstance(weekday, str) and weekday != now_weekday
) or now_weekday not in weekday:
return False
return True

View File

@@ -843,10 +843,7 @@ def time_zone(value: str) -> str:
)
weekdays = vol.Any(
vol.All(ensure_list, [vol.In(WEEKDAYS)]),
entity_domain(["input_weekday"]),
)
weekdays = vol.All(ensure_list, [vol.In(WEEKDAYS)])
def socket_timeout(value: Any | None) -> object:

View File

@@ -1699,3 +1699,90 @@ class ToggleEntity(
await self.async_turn_off(**kwargs)
else:
await self.async_turn_on(**kwargs)
class IncludedEntitiesMixin(Entity):
"""Mixin class to include entities that are contained.
Integrations can include the this Mixin class to
include the `entity_id` state attribute.
"""
_attr_included_entities: list[str]
_attr_included_unique_ids: list[str]
__initialized: bool = False
@callback
def async_set_included_entities(
self, integration_domain: str, unique_ids: list[str]
) -> None:
"""Set the list of included entities identified by their unique IDs.
Integrations need to initialize this in entity.async_async_added_to_hass,
and when the list of included entities changes.
The entity ids of included entities will will be looked up and they will be
tracked for changes.
None existing entities for the supplied unique IDs will be ignored.
"""
self._integration_domain = integration_domain
self._attr_included_unique_ids = unique_ids
self._monitor_member_updates()
@property
def included_unique_ids(self) -> list[str]:
"""Return the list of unique IDs if the entity represents a group.
The corresponding entities will be shown as members in the UI.
"""
if hasattr(self, "_attr_included_unique_ids"):
return self._attr_included_unique_ids
return []
@property
def included_entities(self) -> list[str] | None:
"""Return a list of entity IDs if the entity represents a group.
Included entities will be shown as members in the UI.
"""
if hasattr(self, "_attr_included_entities"):
return self._attr_included_entities
return None
@callback
def _monitor_member_updates(self) -> None:
"""Update the group members if the entity registry is updated."""
entity_registry = er.async_get(self.hass)
assert self.entity_id is not None
platform_domain = self.entity_id.split(".")[0]
def _update_group_entity_ids() -> None:
self._attr_included_entities = []
for included_id in self.included_unique_ids:
if entity_id := entity_registry.async_get_entity_id(
platform_domain, self._integration_domain, included_id
):
self._attr_included_entities.append(entity_id)
async def _handle_entity_registry_updated(event: Event[Any]) -> None:
"""Handle registry create or update event."""
if (
event.data["action"] in {"create", "update"}
and (entry := entity_registry.async_get(event.data["entity_id"]))
and entry.unique_id in self.included_unique_ids
) or (
event.data["action"] == "remove"
and self.included_entities is not None
and event.data["entity_id"] in self.included_entities
):
_update_group_entity_ids()
self.async_write_ha_state()
if not self.__initialized:
self.async_on_remove(
self.hass.bus.async_listen(
er.EVENT_ENTITY_REGISTRY_UPDATED,
_handle_entity_registry_updated,
)
)
self.__initialized = True
_update_group_entity_ids()

View File

@@ -260,11 +260,11 @@ class TriggerConfig:
class TriggerActionType(Protocol):
"""Protocol type for trigger action callback."""
async def __call__(
def __call__(
self,
run_variables: dict[str, Any],
context: Context | None = None,
) -> Any:
) -> Coroutine[Any, Any, Any] | Any:
"""Define action callback type."""
@@ -294,7 +294,7 @@ class PluggableActionsEntry:
actions: dict[
object,
tuple[
HassJob[[dict[str, Any], Context | None], Coroutine[Any, Any, None]],
HassJob[[dict[str, Any], Context | None], Coroutine[Any, Any, None] | Any],
dict[str, Any],
],
] = field(default_factory=dict)
@@ -477,7 +477,7 @@ def _trigger_action_wrapper(
else:
@functools.wraps(action)
async def with_vars(
def with_vars(
run_variables: dict[str, Any], context: Context | None = None
) -> Any:
"""Wrap action with extra vars."""

View File

@@ -91,7 +91,6 @@ NO_IOT_CLASS = [
"input_number",
"input_select",
"input_text",
"input_weekday",
"intent_script",
"intent",
"logbook",

View File

@@ -2214,7 +2214,6 @@ NO_QUALITY_SCALE = [
"input_number",
"input_select",
"input_text",
"input_weekday",
"intent_script",
"intent",
"logbook",

View File

@@ -1061,14 +1061,6 @@ def test_weekday_validation() -> None:
}
time.TRIGGER_SCHEMA(valid_config)
# Valid input_weekday entity
valid_config = {
"platform": "time",
"at": "5:00:00",
"weekday": "input_weekday.workdays",
}
time.TRIGGER_SCHEMA(valid_config)
# Invalid weekday
invalid_config = {"platform": "time", "at": "5:00:00", "weekday": "invalid"}
with pytest.raises(vol.Invalid):
@@ -1082,176 +1074,3 @@ def test_weekday_validation() -> None:
}
with pytest.raises(vol.Invalid):
time.TRIGGER_SCHEMA(invalid_config)
# Invalid entity domain
invalid_config = {
"platform": "time",
"at": "5:00:00",
"weekday": "input_boolean.my_bool",
}
with pytest.raises(vol.Invalid):
time.TRIGGER_SCHEMA(invalid_config)
async def test_if_fires_using_weekday_input_weekday_entity(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
service_calls: list[ServiceCall],
) -> None:
"""Test for firing on weekday using input_weekday entity."""
# Setup input_weekday helper with Mon, Tue, Wed
await async_setup_component(
hass,
"input_weekday",
{
"input_weekday": {
"workdays": {
"name": "Work Days",
"weekdays": ["mon", "tue", "wed"],
}
}
},
)
await hass.async_block_till_done()
# Freeze time to Monday, January 2, 2023 at 5:00:00
monday_trigger = dt_util.as_utc(datetime(2023, 1, 2, 5, 0, 0, 0))
freezer.move_to(monday_trigger)
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
"platform": "time",
"at": "5:00:00",
"weekday": "input_weekday.workdays",
},
"action": {
"service": "test.automation",
"data_template": {
"some": "{{ trigger.platform }} - {{ trigger.now.strftime('%A') }}",
},
},
}
},
)
await hass.async_block_till_done()
# Fire on Monday - should trigger (Monday is in workdays)
async_fire_time_changed(hass, monday_trigger + timedelta(seconds=1))
await hass.async_block_till_done()
automation_calls = [call for call in service_calls if call.domain == "test"]
assert len(automation_calls) == 1
assert "Monday" in automation_calls[0].data["some"]
# Fire on Tuesday - should trigger (Tuesday is in workdays)
tuesday_trigger = dt_util.as_utc(datetime(2023, 1, 3, 5, 0, 0, 0))
async_fire_time_changed(hass, tuesday_trigger)
await hass.async_block_till_done()
automation_calls = [call for call in service_calls if call.domain == "test"]
assert len(automation_calls) == 2
assert "Tuesday" in automation_calls[1].data["some"]
# Fire on Thursday - should not trigger (Thursday is not in workdays)
thursday_trigger = dt_util.as_utc(datetime(2023, 1, 5, 5, 0, 0, 0))
async_fire_time_changed(hass, thursday_trigger)
await hass.async_block_till_done()
automation_calls = [call for call in service_calls if call.domain == "test"]
assert len(automation_calls) == 2
# Fire on Saturday - should not trigger (Saturday is not in workdays)
saturday_trigger = dt_util.as_utc(datetime(2023, 1, 7, 5, 0, 0, 0))
async_fire_time_changed(hass, saturday_trigger)
await hass.async_block_till_done()
automation_calls = [call for call in service_calls if call.domain == "test"]
assert len(automation_calls) == 2
async def test_if_action_weekday_input_weekday_entity(
hass: HomeAssistant, service_calls: list[ServiceCall]
) -> None:
"""Test time condition with input_weekday entity."""
# Setup input_weekday helper with Sat, Sun
await async_setup_component(
hass,
"input_weekday",
{
"input_weekday": {
"weekend": {"name": "Weekend Days", "weekdays": ["sat", "sun"]}
}
},
)
await hass.async_block_till_done()
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {"platform": "event", "event_type": "test_event"},
"condition": {"condition": "time", "weekday": "input_weekday.weekend"},
"action": {"service": "test.automation"},
}
},
)
await hass.async_block_till_done()
days_past_monday = dt_util.now().weekday()
monday = dt_util.now() - timedelta(days=days_past_monday)
saturday = monday + timedelta(days=5)
sunday = saturday + timedelta(days=1)
# Test on Monday - should not trigger (not in weekend)
with patch("homeassistant.helpers.condition.dt_util.now", return_value=monday):
hass.bus.async_fire("test_event")
await hass.async_block_till_done()
assert len(service_calls) == 0
# Test on Saturday - should trigger
with patch("homeassistant.helpers.condition.dt_util.now", return_value=saturday):
hass.bus.async_fire("test_event")
await hass.async_block_till_done()
assert len(service_calls) == 1
# Test on Sunday - should trigger
with patch("homeassistant.helpers.condition.dt_util.now", return_value=sunday):
hass.bus.async_fire("test_event")
await hass.async_block_till_done()
assert len(service_calls) == 2
async def test_if_fires_weekday_entity_unavailable(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
service_calls: list[ServiceCall],
) -> None:
"""Test that trigger does not fire when input_weekday entity is unavailable."""
# Freeze time to Monday, January 2, 2023 at 5:00:00
monday_trigger = dt_util.as_utc(datetime(2023, 1, 2, 5, 0, 0, 0))
freezer.move_to(monday_trigger)
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
"platform": "time",
"at": "5:00:00",
"weekday": "input_weekday.nonexistent",
},
"action": {
"service": "test.automation",
},
}
},
)
await hass.async_block_till_done()
# Fire on Monday - should not trigger (entity doesn't exist)
async_fire_time_changed(hass, monday_trigger + timedelta(seconds=1))
await hass.async_block_till_done()
automation_calls = [call for call in service_calls if call.domain == "test"]
assert len(automation_calls) == 0

View File

@@ -1 +0,0 @@
"""Tests for the Input Weekday component."""

View File

@@ -1,518 +0,0 @@
"""Tests for the Input Weekday component."""
from typing import Any
from unittest.mock import patch
import pytest
from homeassistant.components.input_weekday import (
ATTR_WEEKDAY,
ATTR_WEEKDAYS,
DOMAIN,
SERVICE_ADD_WEEKDAY,
SERVICE_CLEAR,
SERVICE_REMOVE_WEEKDAY,
SERVICE_SET_WEEKDAYS,
SERVICE_TOGGLE_WEEKDAY,
STORAGE_VERSION,
)
from homeassistant.const import (
ATTR_EDITABLE,
ATTR_ENTITY_ID,
ATTR_FRIENDLY_NAME,
SERVICE_RELOAD,
)
from homeassistant.core import HomeAssistant, State
from homeassistant.helpers import entity_registry as er
from homeassistant.setup import async_setup_component
from tests.common import mock_restore_cache
from tests.typing import WebSocketGenerator
@pytest.fixture
def storage_setup(hass: HomeAssistant, hass_storage: dict[str, Any]):
"""Storage setup."""
async def _storage(items=None, config=None):
if items is None:
hass_storage[DOMAIN] = {
"key": DOMAIN,
"version": STORAGE_VERSION,
"data": {
"items": [
{
"id": "from_storage",
"name": "from storage",
"weekdays": ["mon", "wed", "fri"],
}
]
},
}
else:
hass_storage[DOMAIN] = {
"key": DOMAIN,
"version": STORAGE_VERSION,
"data": {"items": items},
}
if config is None:
config = {DOMAIN: {}}
return await async_setup_component(hass, DOMAIN, config)
return _storage
@pytest.mark.parametrize(
"invalid_config",
[
None,
{"name with space": None},
{"bad_weekdays": {"weekdays": ["invalid"]}},
],
)
async def test_config(hass: HomeAssistant, invalid_config) -> None:
"""Test config."""
assert not await async_setup_component(hass, DOMAIN, {DOMAIN: invalid_config})
async def test_set_weekdays(hass: HomeAssistant) -> None:
"""Test set_weekdays service."""
assert await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {"test_1": {"weekdays": ["mon", "tue"]}}},
)
entity_id = "input_weekday.test_1"
state = hass.states.get(entity_id)
assert state.state == "mon,tue"
assert state.attributes[ATTR_WEEKDAYS] == ["mon", "tue"]
await hass.services.async_call(
DOMAIN,
SERVICE_SET_WEEKDAYS,
{ATTR_ENTITY_ID: entity_id, ATTR_WEEKDAYS: ["wed", "thu", "fri"]},
blocking=True,
)
state = hass.states.get(entity_id)
assert state.state == "wed,thu,fri"
assert state.attributes[ATTR_WEEKDAYS] == ["wed", "thu", "fri"]
async def test_set_weekdays_removes_duplicates(hass: HomeAssistant) -> None:
"""Test set_weekdays removes duplicate weekdays."""
assert await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {"test_1": {"weekdays": []}}},
)
entity_id = "input_weekday.test_1"
await hass.services.async_call(
DOMAIN,
SERVICE_SET_WEEKDAYS,
{ATTR_ENTITY_ID: entity_id, ATTR_WEEKDAYS: ["mon", "tue", "mon", "wed"]},
blocking=True,
)
state = hass.states.get(entity_id)
assert state.attributes[ATTR_WEEKDAYS] == ["mon", "tue", "wed"]
async def test_add_weekday(hass: HomeAssistant) -> None:
"""Test add_weekday service."""
assert await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {"test_1": {"weekdays": ["mon"]}}},
)
entity_id = "input_weekday.test_1"
state = hass.states.get(entity_id)
assert state.attributes[ATTR_WEEKDAYS] == ["mon"]
await hass.services.async_call(
DOMAIN,
SERVICE_ADD_WEEKDAY,
{ATTR_ENTITY_ID: entity_id, ATTR_WEEKDAY: "wed"},
blocking=True,
)
state = hass.states.get(entity_id)
assert state.attributes[ATTR_WEEKDAYS] == ["mon", "wed"]
# Adding duplicate should not add it again
await hass.services.async_call(
DOMAIN,
SERVICE_ADD_WEEKDAY,
{ATTR_ENTITY_ID: entity_id, ATTR_WEEKDAY: "mon"},
blocking=True,
)
state = hass.states.get(entity_id)
assert state.attributes[ATTR_WEEKDAYS] == ["mon", "wed"]
async def test_remove_weekday(hass: HomeAssistant) -> None:
"""Test remove_weekday service."""
assert await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {"test_1": {"weekdays": ["mon", "wed", "fri"]}}},
)
entity_id = "input_weekday.test_1"
state = hass.states.get(entity_id)
assert state.attributes[ATTR_WEEKDAYS] == ["mon", "wed", "fri"]
await hass.services.async_call(
DOMAIN,
SERVICE_REMOVE_WEEKDAY,
{ATTR_ENTITY_ID: entity_id, ATTR_WEEKDAY: "wed"},
blocking=True,
)
state = hass.states.get(entity_id)
assert state.attributes[ATTR_WEEKDAYS] == ["mon", "fri"]
# Removing non-existent weekday should not error
await hass.services.async_call(
DOMAIN,
SERVICE_REMOVE_WEEKDAY,
{ATTR_ENTITY_ID: entity_id, ATTR_WEEKDAY: "wed"},
blocking=True,
)
state = hass.states.get(entity_id)
assert state.attributes[ATTR_WEEKDAYS] == ["mon", "fri"]
async def test_toggle_weekday(hass: HomeAssistant) -> None:
"""Test toggle_weekday service."""
assert await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {"test_1": {"weekdays": ["mon"]}}},
)
entity_id = "input_weekday.test_1"
state = hass.states.get(entity_id)
assert state.attributes[ATTR_WEEKDAYS] == ["mon"]
# Toggle off (remove)
await hass.services.async_call(
DOMAIN,
SERVICE_TOGGLE_WEEKDAY,
{ATTR_ENTITY_ID: entity_id, ATTR_WEEKDAY: "mon"},
blocking=True,
)
state = hass.states.get(entity_id)
assert state.attributes[ATTR_WEEKDAYS] == []
# Toggle on (add)
await hass.services.async_call(
DOMAIN,
SERVICE_TOGGLE_WEEKDAY,
{ATTR_ENTITY_ID: entity_id, ATTR_WEEKDAY: "tue"},
blocking=True,
)
state = hass.states.get(entity_id)
assert state.attributes[ATTR_WEEKDAYS] == ["tue"]
async def test_clear(hass: HomeAssistant) -> None:
"""Test clear service."""
assert await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {"test_1": {"weekdays": ["mon", "wed", "fri"]}}},
)
entity_id = "input_weekday.test_1"
state = hass.states.get(entity_id)
assert state.attributes[ATTR_WEEKDAYS] == ["mon", "wed", "fri"]
await hass.services.async_call(
DOMAIN,
SERVICE_CLEAR,
{ATTR_ENTITY_ID: entity_id},
blocking=True,
)
state = hass.states.get(entity_id)
assert state.state == ""
assert state.attributes[ATTR_WEEKDAYS] == []
async def test_config_with_name(hass: HomeAssistant) -> None:
"""Test configuration with name."""
assert await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {"test_1": {"name": "Test Weekday", "weekdays": ["sat", "sun"]}}},
)
state = hass.states.get("input_weekday.test_1")
assert state is not None
assert state.attributes[ATTR_FRIENDLY_NAME] == "Test Weekday"
assert state.attributes[ATTR_WEEKDAYS] == ["sat", "sun"]
async def test_empty_weekdays(hass: HomeAssistant) -> None:
"""Test empty weekdays configuration."""
assert await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {"test_1": {"weekdays": []}}},
)
state = hass.states.get("input_weekday.test_1")
assert state is not None
assert state.state == ""
assert state.attributes[ATTR_WEEKDAYS] == []
async def test_default_weekdays(hass: HomeAssistant) -> None:
"""Test default weekdays (empty list)."""
assert await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {"test_1": {}}},
)
state = hass.states.get("input_weekday.test_1")
assert state is not None
assert state.state == ""
assert state.attributes[ATTR_WEEKDAYS] == []
async def test_config_removes_duplicates(hass: HomeAssistant) -> None:
"""Test that configuration removes duplicate weekdays."""
assert await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {"test_1": {"weekdays": ["mon", "tue", "mon", "wed"]}}},
)
state = hass.states.get("input_weekday.test_1")
assert state is not None
assert state.attributes[ATTR_WEEKDAYS] == ["mon", "tue", "wed"]
async def test_reload(hass: HomeAssistant) -> None:
"""Test reload service."""
assert await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {"test_1": {"weekdays": ["mon"]}}},
)
state_1 = hass.states.get("input_weekday.test_1")
state_2 = hass.states.get("input_weekday.test_2")
assert state_1 is not None
assert state_2 is None
assert state_1.attributes[ATTR_WEEKDAYS] == ["mon"]
with patch(
"homeassistant.config.load_yaml_config_file",
return_value={
DOMAIN: {
"test_2": {"weekdays": ["tue", "thu"]},
}
},
):
await hass.services.async_call(
DOMAIN,
SERVICE_RELOAD,
blocking=True,
)
await hass.async_block_till_done()
state_1 = hass.states.get("input_weekday.test_1")
state_2 = hass.states.get("input_weekday.test_2")
assert state_1 is None
assert state_2 is not None
assert state_2.attributes[ATTR_WEEKDAYS] == ["tue", "thu"]
async def test_state_restoration(hass: HomeAssistant) -> None:
"""Test state restoration."""
mock_restore_cache(
hass,
(
State(
"input_weekday.test_1",
"mon,wed,fri",
{ATTR_WEEKDAYS: ["mon", "wed", "fri"]},
),
),
)
hass.state = "starting"
await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {"test_1": {}}},
)
state = hass.states.get("input_weekday.test_1")
assert state
assert state.attributes[ATTR_WEEKDAYS] == ["mon", "wed", "fri"]
async def test_state_restoration_with_initial(hass: HomeAssistant) -> None:
"""Test state restoration with initial value - should prefer initial."""
mock_restore_cache(
hass,
(
State(
"input_weekday.test_1",
"mon,wed,fri",
{ATTR_WEEKDAYS: ["mon", "wed", "fri"]},
),
),
)
hass.state = "starting"
await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {"test_1": {"weekdays": ["sat", "sun"]}}},
)
state = hass.states.get("input_weekday.test_1")
assert state
assert state.attributes[ATTR_WEEKDAYS] == ["sat", "sun"]
async def test_storage(hass: HomeAssistant, storage_setup) -> None:
"""Test storage."""
assert await storage_setup()
state = hass.states.get("input_weekday.from_storage")
assert state.attributes[ATTR_WEEKDAYS] == ["mon", "wed", "fri"]
assert state.attributes[ATTR_EDITABLE]
async def test_editable_state_attribute(hass: HomeAssistant) -> None:
"""Test editable attribute."""
assert await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {"test_1": {"weekdays": ["mon"]}}},
)
state = hass.states.get("input_weekday.test_1")
assert state.attributes[ATTR_EDITABLE] is False
async def test_websocket_create(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test create via websocket."""
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
client = await hass_ws_client(hass)
await client.send_json(
{
"id": 1,
"type": f"{DOMAIN}/create",
"name": "My Weekday",
"weekdays": ["mon", "fri"],
}
)
resp = await client.receive_json()
assert resp["success"]
state = hass.states.get("input_weekday.my_weekday")
assert state.attributes[ATTR_WEEKDAYS] == ["mon", "fri"]
async def test_websocket_update(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
entity_registry: er.EntityRegistry,
) -> None:
"""Test update via websocket."""
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
client = await hass_ws_client(hass)
await client.send_json(
{
"id": 1,
"type": f"{DOMAIN}/create",
"name": "My Weekday",
"weekdays": ["mon"],
}
)
resp = await client.receive_json()
assert resp["success"]
state = hass.states.get("input_weekday.my_weekday")
assert state.attributes[ATTR_WEEKDAYS] == ["mon"]
entity_entry = entity_registry.async_get("input_weekday.my_weekday")
await client.send_json(
{
"id": 2,
"type": f"{DOMAIN}/update",
f"{DOMAIN}_id": entity_entry.unique_id,
"weekdays": ["tue", "wed"],
"name": "Updated Weekday",
}
)
resp = await client.receive_json()
assert resp["success"]
state = hass.states.get("input_weekday.my_weekday")
assert state.attributes[ATTR_WEEKDAYS] == ["tue", "wed"]
assert state.attributes[ATTR_FRIENDLY_NAME] == "Updated Weekday"
async def test_websocket_delete(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
entity_registry: er.EntityRegistry,
) -> None:
"""Test delete via websocket."""
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
client = await hass_ws_client(hass)
await client.send_json(
{
"id": 1,
"type": f"{DOMAIN}/create",
"name": "My Weekday",
"weekdays": ["mon"],
}
)
resp = await client.receive_json()
assert resp["success"]
state = hass.states.get("input_weekday.my_weekday")
assert state is not None
entity_entry = entity_registry.async_get("input_weekday.my_weekday")
await client.send_json(
{
"id": 2,
"type": f"{DOMAIN}/delete",
f"{DOMAIN}_id": entity_entry.unique_id,
}
)
resp = await client.receive_json()
assert resp["success"]
state = hass.states.get("input_weekday.my_weekday")
assert state is None

View File

@@ -1,37 +0,0 @@
"""Tests for the Input Weekday recorder."""
from homeassistant.components.input_weekday import ATTR_EDITABLE, ATTR_WEEKDAYS
from homeassistant.components.recorder import Recorder
from homeassistant.components.recorder.history import get_significant_states
from homeassistant.const import ATTR_FRIENDLY_NAME
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from tests.components.recorder.common import async_wait_recording_done
async def test_exclude_attributes(recorder_mock: Recorder, hass: HomeAssistant) -> None:
"""Test that certain attributes are excluded."""
now = dt_util.utcnow()
assert await async_setup_component(
hass,
"input_weekday",
{"input_weekday": {"test": {"weekdays": ["mon", "wed"]}}},
)
state = hass.states.get("input_weekday.test")
assert state.attributes[ATTR_WEEKDAYS] == ["mon", "wed"]
assert state.attributes[ATTR_EDITABLE] is False
await async_wait_recording_done(hass)
states = await hass.async_add_executor_job(
get_significant_states, hass, now, None, ["input_weekday.test"]
)
assert len(states) == 1
for entity_states in states.values():
for state in entity_states:
assert ATTR_WEEKDAYS in state.attributes
assert ATTR_EDITABLE not in state.attributes
assert ATTR_FRIENDLY_NAME in state.attributes

View File

@@ -1,59 +0,0 @@
"""Test reproduce state for Input Weekday."""
import pytest
from homeassistant.components.input_weekday import ATTR_WEEKDAYS, DOMAIN
from homeassistant.core import HomeAssistant, State
from homeassistant.helpers.state import async_reproduce_state
from homeassistant.setup import async_setup_component
from tests.common import async_mock_service
@pytest.fixture
async def setup_component(hass: HomeAssistant):
"""Set up component."""
assert await async_setup_component(
hass, DOMAIN, {DOMAIN: {"test_weekday": {"weekdays": []}}}
)
async def test_reproduce_weekday(hass: HomeAssistant) -> None:
"""Test reproduce weekday."""
calls = async_mock_service(hass, DOMAIN, "set_weekdays")
await async_reproduce_state(
hass,
[
State(
"input_weekday.test_weekday",
"mon,wed,fri",
{ATTR_WEEKDAYS: ["mon", "wed", "fri"]},
)
],
)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].data == {
"entity_id": "input_weekday.test_weekday",
ATTR_WEEKDAYS: ["mon", "wed", "fri"],
}
async def test_reproduce_weekday_missing_attribute(
hass: HomeAssistant, setup_component, caplog: pytest.LogCaptureFixture
) -> None:
"""Test reproduce weekday with missing weekdays attribute."""
calls = async_mock_service(hass, DOMAIN, "set_weekdays")
await async_reproduce_state(
hass,
[State("input_weekday.test_weekday", "mon,wed")],
)
await hass.async_block_till_done()
assert len(calls) == 0
assert "weekdays attribute is missing" in caplog.text

View File

@@ -0,0 +1,65 @@
"""Fixtures for the london_underground tests."""
from collections.abc import AsyncGenerator
import json
from unittest.mock import AsyncMock, patch
from london_tube_status import parse_api_response
import pytest
from homeassistant.components.london_underground.const import CONF_LINE, DOMAIN
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry, async_load_fixture
from tests.conftest import AiohttpClientMocker
@pytest.fixture
def mock_setup_entry():
"""Prevent setup of integration during tests."""
with patch(
"homeassistant.components.london_underground.async_setup_entry",
return_value=True,
) as mock_setup:
yield mock_setup
@pytest.fixture
async def mock_config_entry(hass: HomeAssistant) -> MockConfigEntry:
"""Mock the config entry."""
entry = MockConfigEntry(
domain=DOMAIN,
data={},
options={CONF_LINE: ["Metropolitan"]},
title="London Underground",
)
# Add and set up the entry
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)
return entry
@pytest.fixture
async def mock_london_underground_client(
hass: HomeAssistant,
aioclient_mock: AiohttpClientMocker,
) -> AsyncGenerator[AsyncMock]:
"""Mock a London Underground client."""
with (
patch(
"homeassistant.components.london_underground.TubeData",
autospec=True,
) as mock_client,
patch(
"homeassistant.components.london_underground.config_flow.TubeData",
new=mock_client,
),
):
client = mock_client.return_value
# Load the fixture text
fixture_text = await async_load_fixture(hass, "line_status.json", DOMAIN)
fixture_data = parse_api_response(json.loads(fixture_text))
client.data = fixture_data
yield client

View File

@@ -0,0 +1,186 @@
"""Test the London Underground config flow."""
import asyncio
import pytest
from homeassistant.components.london_underground.const import (
CONF_LINE,
DEFAULT_LINES,
DOMAIN,
)
from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers import issue_registry as ir
async def test_validate_input_success(
hass: HomeAssistant, mock_setup_entry, mock_london_underground_client
) -> None:
"""Test successful validation of TfL API."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_LINE: ["Bakerloo", "Central"]},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "London Underground"
assert result["data"] == {}
assert result["options"] == {CONF_LINE: ["Bakerloo", "Central"]}
async def test_options(
hass: HomeAssistant, mock_setup_entry, mock_config_entry
) -> None:
"""Test updating options."""
result = await hass.config_entries.options.async_init(mock_config_entry.entry_id)
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_LINE: ["Bakerloo", "Central"],
},
)
assert result["type"] == FlowResultType.CREATE_ENTRY
assert result["data"] == {
CONF_LINE: ["Bakerloo", "Central"],
}
@pytest.mark.parametrize(
("side_effect", "expected_error"),
[
(Exception, "cannot_connect"),
(asyncio.TimeoutError, "timeout_connect"),
],
)
async def test_validate_input_exceptions(
hass: HomeAssistant,
mock_setup_entry,
mock_london_underground_client,
side_effect,
expected_error,
) -> None:
"""Test validation with connection and timeout errors."""
mock_london_underground_client.update.side_effect = side_effect
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_LINE: ["Bakerloo", "Central"]},
)
assert result["type"] is FlowResultType.FORM
assert result["errors"]["base"] == expected_error
# confirm recovery after error
mock_london_underground_client.update.side_effect = None
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "London Underground"
assert result["data"] == {}
assert result["options"] == {CONF_LINE: DEFAULT_LINES}
async def test_already_configured(
hass: HomeAssistant,
mock_london_underground_client,
mock_setup_entry,
mock_config_entry,
) -> None:
"""Try (and fail) setting up a config entry when one already exists."""
# Try to start the flow
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "single_instance_allowed"
async def test_yaml_import(
hass: HomeAssistant,
issue_registry: ir.IssueRegistry,
mock_london_underground_client,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test a YAML sensor is imported and becomes an operational config entry."""
# Set up via YAML which will trigger import and set up the config entry
IMPORT_DATA = {
"platform": "london_underground",
"line": ["Central", "Piccadilly", "Victoria", "Bakerloo", "Northern"],
}
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=IMPORT_DATA
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "London Underground"
assert result["data"] == {}
assert result["options"] == {
CONF_LINE: ["Central", "Piccadilly", "Victoria", "Bakerloo", "Northern"]
}
async def test_failed_yaml_import_connection(
hass: HomeAssistant,
issue_registry: ir.IssueRegistry,
mock_london_underground_client,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test a YAML sensor is imported and becomes an operational config entry."""
# Set up via YAML which will trigger import and set up the config entry
mock_london_underground_client.update.side_effect = asyncio.TimeoutError
IMPORT_DATA = {
"platform": "london_underground",
"line": ["Central", "Piccadilly", "Victoria", "Bakerloo", "Northern"],
}
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=IMPORT_DATA
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "cannot_connect"
async def test_failed_yaml_import_already_configured(
hass: HomeAssistant,
issue_registry: ir.IssueRegistry,
mock_london_underground_client,
caplog: pytest.LogCaptureFixture,
mock_config_entry,
) -> None:
"""Test a YAML sensor is imported and becomes an operational config entry."""
# Set up via YAML which will trigger import and set up the config entry
IMPORT_DATA = {
"platform": "london_underground",
"line": ["Central", "Piccadilly", "Victoria", "Bakerloo", "Northern"],
}
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=IMPORT_DATA
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "single_instance_allowed"

View File

@@ -0,0 +1,20 @@
"""Test the London Underground init."""
from homeassistant.core import HomeAssistant
async def test_reload_entry(
hass: HomeAssistant, mock_london_underground_client, mock_config_entry
) -> None:
"""Test reloading the config entry."""
# Test reloading with updated options
hass.config_entries.async_update_entry(
mock_config_entry,
data={},
options={"line": ["Bakerloo", "Central"]},
)
await hass.async_block_till_done()
# Verify that setup was called for each reload
assert len(mock_london_underground_client.mock_calls) > 0

View File

@@ -1,37 +1,130 @@
"""The tests for the london_underground platform."""
from london_tube_status import API_URL
import asyncio
import pytest
from homeassistant.components.london_underground.const import CONF_LINE, DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
from homeassistant.helpers import issue_registry as ir
from homeassistant.setup import async_setup_component
from tests.common import async_load_fixture
from tests.test_util.aiohttp import AiohttpClientMocker
VALID_CONFIG = {
"sensor": {"platform": "london_underground", CONF_LINE: ["Metropolitan"]}
}
async def test_valid_state(
hass: HomeAssistant, aioclient_mock: AiohttpClientMocker
hass: HomeAssistant,
issue_registry: ir.IssueRegistry,
mock_london_underground_client,
mock_config_entry,
) -> None:
"""Test for operational london_underground sensor with proper attributes."""
aioclient_mock.get(
API_URL,
text=await async_load_fixture(hass, "line_status.json", DOMAIN),
)
"""Test operational London Underground sensor using a mock config entry."""
# Ensure the entry is fully loaded
assert mock_config_entry.state is ConfigEntryState.LOADED
# Confirm that the expected entity exists and is correct
state = hass.states.get("sensor.london_underground_metropolitan")
assert state is not None
assert state.state == "Good Service"
assert state.attributes == {
"Description": "Nothing to report",
"attribution": "Powered by TfL Open Data",
"friendly_name": "London Underground Metropolitan",
"icon": "mdi:subway",
}
# No YAML warning should be issued, since setup was not via YAML
assert not issue_registry.async_get_issue(DOMAIN, "yaml_deprecated")
async def test_yaml_import(
hass: HomeAssistant,
issue_registry: ir.IssueRegistry,
mock_london_underground_client,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test a YAML sensor is imported and becomes an operational config entry."""
# Set up via YAML which will trigger import and set up the config entry
VALID_CONFIG = {
"sensor": {
"platform": "london_underground",
CONF_LINE: ["Metropolitan", "London Overground"],
}
}
assert await async_setup_component(hass, "sensor", VALID_CONFIG)
await hass.async_block_till_done()
state = hass.states.get("sensor.metropolitan")
# Verify the config entry was created
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 1
# Verify a warning was issued about YAML deprecation
assert issue_registry.async_get_issue(HOMEASSISTANT_DOMAIN, "deprecated_yaml")
# Check the state after setup completes
state = hass.states.get("sensor.london_underground_metropolitan")
assert state
assert state.state == "Good Service"
assert state.attributes == {
"Description": "Nothing to report",
"attribution": "Powered by TfL Open Data",
"friendly_name": "Metropolitan",
"friendly_name": "London Underground Metropolitan",
"icon": "mdi:subway",
}
# Since being renamed London overground is no longer returned by the API
# So check that we do not import it and that we warn the user
state = hass.states.get("sensor.london_underground_london_overground")
assert not state
assert any(
"London Overground was removed from the configuration as the line has been divided and renamed"
in record.message
for record in caplog.records
)
async def test_failed_yaml_import(
hass: HomeAssistant,
issue_registry: ir.IssueRegistry,
mock_london_underground_client,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test a YAML sensor is imported and becomes an operational config entry."""
# Set up via YAML which will trigger import and set up the config entry
mock_london_underground_client.update.side_effect = asyncio.TimeoutError
VALID_CONFIG = {
"sensor": {"platform": "london_underground", CONF_LINE: ["Metropolitan"]}
}
assert await async_setup_component(hass, "sensor", VALID_CONFIG)
await hass.async_block_till_done()
# Verify the config entry was not created
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 0
# verify no flows still in progress
flows = hass.config_entries.flow.async_progress()
assert len(flows) == 0
assert any(
"Unexpected error trying to connect before importing config" in record.message
for record in caplog.records
)
# Confirm that the import did not happen
assert not any(
"Importing London Underground config from configuration.yaml" in record.message
for record in caplog.records
)
assert not any(
"migrated to a config entry and can be safely removed" in record.message
for record in caplog.records
)
# Verify a warning was issued about YAML not being imported
assert issue_registry.async_get_issue(
DOMAIN, "deprecated_yaml_import_issue_cannot_connect"
)

View File

@@ -115,3 +115,119 @@
'state': '0',
})
# ---
# name: test_cury_number_entity[number.test_name_left_slot_intensity-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': dict({
'max': 100,
'min': 0,
'mode': <NumberMode.SLIDER: 'slider'>,
'step': 1,
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'number',
'entity_category': None,
'entity_id': 'number.test_name_left_slot_intensity',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': 'Left slot intensity',
'platform': 'shelly',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'unique_id': '123456789ABC-cury:0-left_slot_intensity',
'unit_of_measurement': '%',
})
# ---
# name: test_cury_number_entity[number.test_name_left_slot_intensity-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Test name Left slot intensity',
'max': 100,
'min': 0,
'mode': <NumberMode.SLIDER: 'slider'>,
'step': 1,
'unit_of_measurement': '%',
}),
'context': <ANY>,
'entity_id': 'number.test_name_left_slot_intensity',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '70',
})
# ---
# name: test_cury_number_entity[number.test_name_right_slot_intensity-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': dict({
'max': 100,
'min': 0,
'mode': <NumberMode.SLIDER: 'slider'>,
'step': 1,
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'number',
'entity_category': None,
'entity_id': 'number.test_name_right_slot_intensity',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': 'Right slot intensity',
'platform': 'shelly',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'unique_id': '123456789ABC-cury:0-right_slot_intensity',
'unit_of_measurement': '%',
})
# ---
# name: test_cury_number_entity[number.test_name_right_slot_intensity-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Test name Right slot intensity',
'max': 100,
'min': 0,
'mode': <NumberMode.SLIDER: 'slider'>,
'step': 1,
'unit_of_measurement': '%',
}),
'context': <ANY>,
'entity_id': 'number.test_name_right_slot_intensity',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '70',
})
# ---

View File

@@ -157,6 +157,206 @@
'state': '0',
})
# ---
# name: test_cury_sensor_entity[sensor.test_name_left_slot_level-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': dict({
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.test_name_left_slot_level',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': 'Left slot level',
'platform': 'shelly',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'vial_level',
'unique_id': '123456789ABC-cury:0-cury_left_level',
'unit_of_measurement': '%',
})
# ---
# name: test_cury_sensor_entity[sensor.test_name_left_slot_level-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Test name Left slot level',
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
'unit_of_measurement': '%',
}),
'context': <ANY>,
'entity_id': 'sensor.test_name_left_slot_level',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '27',
})
# ---
# name: test_cury_sensor_entity[sensor.test_name_left_slot_vial-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.test_name_left_slot_vial',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': 'Left slot vial',
'platform': 'shelly',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'vial_name',
'unique_id': '123456789ABC-cury:0-cury_left_vial',
'unit_of_measurement': None,
})
# ---
# name: test_cury_sensor_entity[sensor.test_name_left_slot_vial-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Test name Left slot vial',
}),
'context': <ANY>,
'entity_id': 'sensor.test_name_left_slot_vial',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'Forest Dream',
})
# ---
# name: test_cury_sensor_entity[sensor.test_name_right_slot_level-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': dict({
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.test_name_right_slot_level',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': 'Right slot level',
'platform': 'shelly',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'vial_level',
'unique_id': '123456789ABC-cury:0-cury_right_level',
'unit_of_measurement': '%',
})
# ---
# name: test_cury_sensor_entity[sensor.test_name_right_slot_level-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Test name Right slot level',
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
'unit_of_measurement': '%',
}),
'context': <ANY>,
'entity_id': 'sensor.test_name_right_slot_level',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '84',
})
# ---
# name: test_cury_sensor_entity[sensor.test_name_right_slot_vial-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.test_name_right_slot_vial',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': 'Right slot vial',
'platform': 'shelly',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'vial_name',
'unique_id': '123456789ABC-cury:0-cury_right_vial',
'unit_of_measurement': None,
})
# ---
# name: test_cury_sensor_entity[sensor.test_name_right_slot_vial-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Test name Right slot vial',
}),
'context': <ANY>,
'entity_id': 'sensor.test_name_right_slot_vial',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'Velvet Rose',
})
# ---
# name: test_rpc_shelly_ev_sensors[sensor.test_name_charger_state-entry]
EntityRegistryEntrySnapshot({
'aliases': set({

View File

@@ -0,0 +1,97 @@
# serializer version: 1
# name: test_cury_switch_entity[switch.test_name_left_slot-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'switch',
'entity_category': None,
'entity_id': 'switch.test_name_left_slot',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': 'Left slot',
'platform': 'shelly',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'cury_slot',
'unique_id': '123456789ABC-cury:0-cury_left',
'unit_of_measurement': None,
})
# ---
# name: test_cury_switch_entity[switch.test_name_left_slot-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Test name Left slot',
}),
'context': <ANY>,
'entity_id': 'switch.test_name_left_slot',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'on',
})
# ---
# name: test_cury_switch_entity[switch.test_name_right_slot-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'switch',
'entity_category': None,
'entity_id': 'switch.test_name_right_slot',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': 'Right slot',
'platform': 'shelly',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'cury_slot',
'unique_id': '123456789ABC-cury:0-cury_right',
'unit_of_measurement': None,
})
# ---
# name: test_cury_switch_entity[switch.test_name_right_slot-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Test name Right slot',
}),
'context': <ANY>,
'entity_id': 'switch.test_name_right_slot',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'off',
})
# ---

View File

@@ -568,3 +568,50 @@ async def test_blu_trv_number_reauth_error(
assert "context" in flow
assert flow["context"].get("source") == SOURCE_REAUTH
assert flow["context"].get("entry_id") == entry.entry_id
async def test_cury_number_entity(
hass: HomeAssistant,
mock_rpc_device: Mock,
entity_registry: EntityRegistry,
snapshot: SnapshotAssertion,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test number entities for cury component."""
status = {
"cury:0": {
"id": 0,
"slots": {
"left": {
"intensity": 70,
"on": True,
"vial": {"level": 27, "name": "Forest Dream"},
},
"right": {
"intensity": 70,
"on": False,
"vial": {"level": 84, "name": "Velvet Rose"},
},
},
}
}
monkeypatch.setattr(mock_rpc_device, "status", status)
await init_integration(hass, 3)
for entity in ("left_slot_intensity", "right_slot_intensity"):
entity_id = f"{NUMBER_DOMAIN}.test_name_{entity}"
state = hass.states.get(entity_id)
assert state == snapshot(name=f"{entity_id}-state")
entry = entity_registry.async_get(entity_id)
assert entry == snapshot(name=f"{entity_id}-entry")
await hass.services.async_call(
NUMBER_DOMAIN,
SERVICE_SET_VALUE,
{ATTR_ENTITY_ID: "number.test_name_left_slot_intensity", ATTR_VALUE: 80.0},
blocking=True,
)
mock_rpc_device.mock_update()
mock_rpc_device.cury_set.assert_called_once_with(0, slot="left", intensity=80)

View File

@@ -1949,3 +1949,46 @@ async def test_rpc_pm1_energy_consumed_sensor_non_float_value(
assert (state := hass.states.get(entity_id))
assert state.state == STATE_UNKNOWN
async def test_cury_sensor_entity(
hass: HomeAssistant,
mock_rpc_device: Mock,
entity_registry: EntityRegistry,
snapshot: SnapshotAssertion,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test sensor entities for cury component."""
status = {
"cury:0": {
"id": 0,
"slots": {
"left": {
"intensity": 70,
"on": True,
"vial": {"level": 27, "name": "Forest Dream"},
},
"right": {
"intensity": 70,
"on": False,
"vial": {"level": 84, "name": "Velvet Rose"},
},
},
}
}
monkeypatch.setattr(mock_rpc_device, "status", status)
await init_integration(hass, 3)
for entity in (
"left_slot_level",
"right_slot_level",
"left_slot_vial",
"right_slot_vial",
):
entity_id = f"{SENSOR_DOMAIN}.test_name_{entity}"
state = hass.states.get(entity_id)
assert state == snapshot(name=f"{entity_id}-state")
entry = entity_registry.async_get(entity_id)
assert entry == snapshot(name=f"{entity_id}-entry")

View File

@@ -8,6 +8,7 @@ from aioshelly.const import MODEL_1PM, MODEL_GAS, MODEL_MOTION
from aioshelly.exceptions import DeviceConnectionError, InvalidAuthError, RpcCallError
from freezegun.api import FrozenDateTimeFactory
import pytest
from syrupy.assertion import SnapshotAssertion
from homeassistant.components.climate import DOMAIN as CLIMATE_DOMAIN
from homeassistant.components.shelly.const import (
@@ -24,6 +25,7 @@ from homeassistant.const import (
SERVICE_TURN_ON,
STATE_OFF,
STATE_ON,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
Platform,
)
@@ -35,6 +37,7 @@ from homeassistant.helpers.entity_registry import EntityRegistry
from . import (
init_integration,
inject_rpc_device_event,
mutate_rpc_device_status,
patch_platforms,
register_device,
register_entity,
@@ -829,3 +832,119 @@ async def test_rpc_device_script_switch(
assert (state := hass.states.get(entity_id))
assert state.state == STATE_ON
mock_rpc_device.script_start.assert_called_once_with(1)
async def test_cury_switch_entity(
hass: HomeAssistant,
mock_rpc_device: Mock,
entity_registry: EntityRegistry,
snapshot: SnapshotAssertion,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test switch entities for cury component."""
status = {
"cury:0": {
"id": 0,
"slots": {
"left": {
"intensity": 70,
"on": True,
"vial": {"level": 27, "name": "Forest Dream"},
},
"right": {
"intensity": 70,
"on": False,
"vial": {"level": 84, "name": "Velvet Rose"},
},
},
}
}
monkeypatch.setattr(mock_rpc_device, "status", status)
await init_integration(hass, 3)
for entity in ("left_slot", "right_slot"):
entity_id = f"{SWITCH_DOMAIN}.test_name_{entity}"
state = hass.states.get(entity_id)
assert state == snapshot(name=f"{entity_id}-state")
entry = entity_registry.async_get(entity_id)
assert entry == snapshot(name=f"{entity_id}-entry")
await hass.services.async_call(
SWITCH_DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: "switch.test_name_left_slot"},
blocking=True,
)
mock_rpc_device.mock_update()
mock_rpc_device.cury_set.assert_called_once_with(0, "left", False)
await hass.services.async_call(
SWITCH_DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: "switch.test_name_right_slot"},
blocking=True,
)
mock_rpc_device.mock_update()
mock_rpc_device.cury_set.assert_called_with(0, "right", True)
async def test_cury_switch_availability(
hass: HomeAssistant,
mock_rpc_device: Mock,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test availability of switch entities for cury component."""
slots = {
"left": {
"intensity": 70,
"on": True,
"vial": {"level": 27, "name": "Forest Dream"},
},
"right": {
"intensity": 70,
"on": False,
"vial": {"level": 84, "name": "Velvet Rose"},
},
}
status = {"cury:0": {"id": 0, "slots": slots}}
monkeypatch.setattr(mock_rpc_device, "status", status)
await init_integration(hass, 3)
entity_id = f"{SWITCH_DOMAIN}.test_name_left_slot"
assert (state := hass.states.get(entity_id))
assert state.state == STATE_ON
slots["left"]["vial"]["level"] = -1
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cury:0", "slots", slots)
mock_rpc_device.mock_update()
assert (state := hass.states.get(entity_id))
assert state.state == STATE_UNAVAILABLE
slots["left"].pop("vial")
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cury:0", "slots", slots)
mock_rpc_device.mock_update()
assert (state := hass.states.get(entity_id))
assert state.state == STATE_UNAVAILABLE
slots["left"] = None
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cury:0", "slots", slots)
mock_rpc_device.mock_update()
assert (state := hass.states.get(entity_id))
assert state.state == STATE_UNAVAILABLE
slots["left"] = {
"intensity": 70,
"on": True,
"vial": {"level": 27, "name": "Forest Dream"},
}
mutate_rpc_device_status(monkeypatch, mock_rpc_device, "cury:0", "slots", slots)
mock_rpc_device.mock_update()
assert (state := hass.states.get(entity_id))
assert state.state == STATE_ON

View File

@@ -6,7 +6,7 @@ from typing import Any
from unittest.mock import AsyncMock, patch
import pytest
from telegram import Bot, Chat, ChatFullInfo, Message, User
from telegram import Bot, Chat, ChatFullInfo, Message, User, WebhookInfo
from telegram.constants import AccentColor, ChatType
from homeassistant.components.telegram_bot import (
@@ -74,11 +74,22 @@ def mock_register_webhook() -> Generator[None]:
"""Mock calls made by telegram_bot when (de)registering webhook."""
with (
patch(
"homeassistant.components.telegram_bot.webhooks.PushBot.register_webhook",
return_value=True,
"homeassistant.components.telegram_bot.webhooks.Bot.delete_webhook",
AsyncMock(),
),
patch(
"homeassistant.components.telegram_bot.webhooks.PushBot.deregister_webhook",
"homeassistant.components.telegram_bot.webhooks.Bot.get_webhook_info",
AsyncMock(
return_value=WebhookInfo(
url="mock url",
last_error_date=datetime.now(),
has_custom_certificate=False,
pending_update_count=0,
)
),
),
patch(
"homeassistant.components.telegram_bot.webhooks.Bot.set_webhook",
return_value=True,
),
):
@@ -113,9 +124,6 @@ def mock_external_calls() -> Generator[None]:
super().__init__(*args, **kwargs)
self._bot_user = test_user
async def delete_webhook(self) -> bool:
return True
with (
patch("homeassistant.components.telegram_bot.bot.Bot", BotMock),
patch.object(BotMock, "get_chat", return_value=test_chat),

View File

@@ -1,12 +1,11 @@
"""Tests for webhooks."""
from datetime import datetime
from ipaddress import IPv4Network
from unittest.mock import AsyncMock, patch
from unittest.mock import patch
from telegram import WebhookInfo
from telegram.error import TimedOut
from homeassistant.components.telegram_bot.const import DOMAIN
from homeassistant.components.telegram_bot.webhooks import TELEGRAM_WEBHOOK_URL
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
@@ -19,91 +18,61 @@ async def test_set_webhooks_failed(
hass: HomeAssistant,
mock_webhooks_config_entry: MockConfigEntry,
mock_external_calls: None,
mock_generate_secret_token,
mock_register_webhook: None,
) -> None:
"""Test set webhooks failed."""
mock_webhooks_config_entry.add_to_hass(hass)
with (
patch(
"homeassistant.components.telegram_bot.webhooks.Bot.get_webhook_info",
AsyncMock(
return_value=WebhookInfo(
url="mock url",
last_error_date=datetime.now(),
has_custom_certificate=False,
pending_update_count=0,
)
),
) as mock_webhook_info,
"homeassistant.components.telegram_bot.webhooks.secrets.choice",
return_value="DEADBEEF12345678DEADBEEF87654321",
),
patch(
"homeassistant.components.telegram_bot.webhooks.Bot.set_webhook",
) as mock_set_webhook,
patch(
"homeassistant.components.telegram_bot.webhooks.ApplicationBuilder"
) as application_builder_class,
):
mock_set_webhook.side_effect = [TimedOut("mock timeout"), False]
application = application_builder_class.return_value.bot.return_value.updater.return_value.build.return_value
application.initialize = AsyncMock()
application.start = AsyncMock()
await hass.config_entries.async_setup(mock_webhooks_config_entry.entry_id)
await hass.async_block_till_done()
await hass.async_stop()
mock_webhook_info.assert_called_once()
application.initialize.assert_called_once()
application.start.assert_called_once()
assert mock_set_webhook.call_count > 0
# first fail with exception, second fail with False
assert mock_set_webhook.call_count == 2
# SETUP_ERROR is result of ConfigEntryNotReady("Failed to register webhook with Telegram") in webhooks.py
assert mock_webhooks_config_entry.state == ConfigEntryState.SETUP_ERROR
# test fail after retries
mock_set_webhook.reset_mock()
mock_set_webhook.side_effect = TimedOut("mock timeout")
await hass.config_entries.async_reload(mock_webhooks_config_entry.entry_id)
await hass.async_block_till_done()
# 3 retries
assert mock_set_webhook.call_count == 3
assert mock_webhooks_config_entry.state == ConfigEntryState.SETUP_ERROR
await hass.async_block_till_done()
async def test_set_webhooks(
hass: HomeAssistant,
mock_webhooks_config_entry: MockConfigEntry,
mock_external_calls: None,
mock_register_webhook: None,
mock_generate_secret_token,
) -> None:
"""Test set webhooks success."""
mock_webhooks_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_webhooks_config_entry.entry_id)
with (
patch(
"homeassistant.components.telegram_bot.webhooks.Bot.get_webhook_info",
AsyncMock(
return_value=WebhookInfo(
url="mock url",
last_error_date=datetime.now(),
has_custom_certificate=False,
pending_update_count=0,
)
),
) as mock_webhook_info,
patch(
"homeassistant.components.telegram_bot.webhooks.Bot.set_webhook",
AsyncMock(return_value=True),
) as mock_set_webhook,
patch(
"homeassistant.components.telegram_bot.webhooks.ApplicationBuilder"
) as application_builder_class,
):
application = application_builder_class.return_value.bot.return_value.updater.return_value.build.return_value
application.initialize = AsyncMock()
application.start = AsyncMock()
await hass.async_block_till_done()
await hass.config_entries.async_setup(mock_webhooks_config_entry.entry_id)
await hass.async_block_till_done()
await hass.async_stop()
mock_webhook_info.assert_called_once()
application.initialize.assert_called_once()
application.start.assert_called_once()
mock_set_webhook.assert_called_once()
assert mock_webhooks_config_entry.state == ConfigEntryState.LOADED
assert mock_webhooks_config_entry.state == ConfigEntryState.LOADED
async def test_webhooks_update_invalid_json(
@@ -148,3 +117,24 @@ async def test_webhooks_unauthorized_network(
await hass.async_block_till_done()
mock_remote.assert_called_once()
async def test_webhooks_deregister_failed(
hass: HomeAssistant,
webhook_platform,
mock_external_calls: None,
mock_generate_secret_token,
) -> None:
"""Test deregister webhooks."""
config_entry = hass.config_entries.async_entries(DOMAIN)[0]
assert config_entry.state == ConfigEntryState.LOADED
with patch(
"homeassistant.components.telegram_bot.webhooks.Bot.delete_webhook",
) as mock_delete_webhook:
mock_delete_webhook.side_effect = TimedOut("mock timeout")
await hass.config_entries.async_unload(config_entry.entry_id)
mock_delete_webhook.assert_called_once()
assert config_entry.state == ConfigEntryState.NOT_LOADED

View File

@@ -0,0 +1,535 @@
# serializer version: 1
# name: test_entry_diagnostics[xc40_electric_2024]
dict({
'Volvo fast interval coordinator': dict({
'centralLock': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:20:20.570000+00:00',
'unit': None,
'value': 'LOCKED',
}),
'frontLeftDoor': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:20:20.570000+00:00',
'unit': None,
'value': 'CLOSED',
}),
'frontLeftWindow': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:28:12.202000+00:00',
'unit': None,
'value': 'CLOSED',
}),
'frontRightDoor': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:20:20.570000+00:00',
'unit': None,
'value': 'CLOSED',
}),
'frontRightWindow': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:28:12.202000+00:00',
'unit': None,
'value': 'CLOSED',
}),
'hood': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:20:20.570000+00:00',
'unit': None,
'value': 'CLOSED',
}),
'rearLeftDoor': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:20:20.570000+00:00',
'unit': None,
'value': 'CLOSED',
}),
'rearLeftWindow': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:28:12.202000+00:00',
'unit': None,
'value': 'CLOSED',
}),
'rearRightDoor': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:20:20.570000+00:00',
'unit': None,
'value': 'CLOSED',
}),
'rearRightWindow': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:28:12.202000+00:00',
'unit': None,
'value': 'CLOSED',
}),
'sunroof': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:28:12.202000+00:00',
'unit': None,
'value': 'UNSPECIFIED',
}),
'tailgate': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:20:20.570000+00:00',
'unit': None,
'value': 'CLOSED',
}),
'tankLid': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:20:20.570000+00:00',
'unit': None,
'value': 'CLOSED',
}),
}),
'Volvo medium interval coordinator': dict({
'batteryChargeLevel': dict({
'extra_data': dict({
'updated_at': '2025-07-02T08:51:23Z',
}),
'status': 'OK',
'timestamp': None,
'unit': 'percentage',
'value': 53,
}),
'chargerConnectionStatus': dict({
'extra_data': dict({
'updated_at': '2025-07-02T08:51:23Z',
}),
'status': 'OK',
'timestamp': None,
'unit': None,
'value': 'CONNECTED',
}),
'chargerPowerStatus': dict({
'extra_data': dict({
'updated_at': '2025-07-02T08:51:23Z',
}),
'status': 'OK',
'timestamp': None,
'unit': None,
'value': 'PROVIDING_POWER',
}),
'chargingCurrentLimit': dict({
'extra_data': dict({
'updated_at': '2024-03-05T08:38:44Z',
}),
'status': 'OK',
'timestamp': None,
'unit': 'ampere',
'value': 32,
}),
'chargingPower': dict({
'extra_data': dict({
'updated_at': '2025-07-02T08:51:23Z',
}),
'status': 'OK',
'timestamp': None,
'unit': 'watts',
'value': 1386,
}),
'chargingStatus': dict({
'extra_data': dict({
'updated_at': '2025-07-02T08:51:23Z',
}),
'status': 'OK',
'timestamp': None,
'unit': None,
'value': 'CHARGING',
}),
'chargingType': dict({
'extra_data': dict({
'updated_at': '2025-07-02T08:51:23Z',
}),
'status': 'OK',
'timestamp': None,
'unit': None,
'value': 'AC',
}),
'electricRange': dict({
'extra_data': dict({
'updated_at': '2025-07-02T08:51:23Z',
}),
'status': 'OK',
'timestamp': None,
'unit': 'mi',
'value': 150,
}),
'estimatedChargingTimeToTargetBatteryChargeLevel': dict({
'extra_data': dict({
'updated_at': '2025-07-02T08:51:23Z',
}),
'status': 'OK',
'timestamp': None,
'unit': 'minutes',
'value': 1440,
}),
'targetBatteryChargeLevel': dict({
'extra_data': dict({
'updated_at': '2024-09-22T09:40:12Z',
}),
'status': 'OK',
'timestamp': None,
'unit': 'percentage',
'value': 90,
}),
}),
'Volvo slow interval coordinator': dict({
'availabilityStatus': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:32:26.169000+00:00',
'unit': None,
'value': 'AVAILABLE',
}),
}),
'Volvo very slow interval coordinator': dict({
'averageEnergyConsumption': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:53:44.785000+00:00',
'unit': 'kWh/100km',
'value': 22.6,
}),
'averageSpeed': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': 'km/h',
'value': 53,
}),
'averageSpeedAutomatic': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': 'km/h',
'value': 26,
}),
'battery_capacity_kwh': dict({
'extra_data': dict({
}),
'value': 81.608,
}),
'brakeFluidLevelWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'brakeLightCenterWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'brakeLightLeftWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'brakeLightRightWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'daytimeRunningLightLeftWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'daytimeRunningLightRightWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'distanceToEmptyBattery': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:30:08.338000+00:00',
'unit': 'km',
'value': 250,
}),
'distanceToService': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': 'km',
'value': 29000,
}),
'engineCoolantLevelWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'engineHoursToService': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': 'h',
'value': 1266,
}),
'fogLightFrontWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'fogLightRearWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'frontLeft': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'UNSPECIFIED',
}),
'frontRight': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'UNSPECIFIED',
}),
'hazardLightsWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'UNSPECIFIED',
}),
'highBeamLeftWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'highBeamRightWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'lowBeamLeftWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'lowBeamRightWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'odometer': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': 'km',
'value': 30000,
}),
'oilLevelWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'positionLightFrontLeftWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'positionLightFrontRightWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'positionLightRearLeftWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'positionLightRearRightWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'rearLeft': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'UNSPECIFIED',
}),
'rearRight': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'UNSPECIFIED',
}),
'registrationPlateLightWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'reverseLightsWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'UNSPECIFIED',
}),
'serviceWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'sideMarkLightsWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'timeToService': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': 'months',
'value': 23,
}),
'tripMeterAutomatic': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': 'km',
'value': 18.2,
}),
'tripMeterManual': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': 'km',
'value': 3822.9,
}),
'turnIndicationFrontLeftWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'turnIndicationFrontRightWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'turnIndicationRearLeftWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'turnIndicationRearRightWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
'washerFluidLevelWarning': dict({
'extra_data': dict({
}),
'timestamp': '2024-12-30T14:18:56.849000+00:00',
'unit': None,
'value': 'NO_WARNING',
}),
}),
'entry_data': dict({
'api_key': '**REDACTED**',
'auth_implementation': 'volvo',
'token': dict({
'access_token': '**REDACTED**',
'expires_at': 1759919745.7328658,
'expires_in': 60,
'refresh_token': '**REDACTED**',
'token_type': 'Bearer',
}),
'vin': '**REDACTED**',
}),
'vehicle': dict({
'battery_capacity_kwh': 81.608,
'description': dict({
'extra_data': dict({
}),
'model': 'XC40',
'steering': 'LEFT',
'upholstery': 'null',
}),
'external_colour': 'Silver Dawn',
'extra_data': dict({
}),
'fuel_type': 'ELECTRIC',
'gearbox': 'AUTOMATIC',
'images': dict({
'exterior_image_url': 'https://cas.volvocars.com/image/dynamic/MY24_0000/123/exterior-v4/_/default.png?market=se&client=public-api-engineering&angle=1&bg=00000000&w=1920',
'extra_data': dict({
}),
'internal_image_url': 'https://cas.volvocars.com/image/dynamic/MY24_0000/123/interior-v4/_/default.jpg?market=se&client=public-api-engineering&angle=0&w=1920',
}),
'model_year': 2024,
'vin': '**REDACTED**',
}),
})
# ---

View File

@@ -0,0 +1,35 @@
"""Test Volvo diagnostics."""
from collections.abc import Awaitable, Callable
import pytest
from syrupy.assertion import SnapshotAssertion
from homeassistant.const import CONF_TOKEN
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
from tests.components.diagnostics import get_diagnostics_for_config_entry
from tests.typing import ClientSessionGenerator
@pytest.mark.usefixtures("mock_api")
async def test_entry_diagnostics(
hass: HomeAssistant,
setup_integration: Callable[[], Awaitable[bool]],
hass_client: ClientSessionGenerator,
snapshot: SnapshotAssertion,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test config entry diagnostics."""
assert await setup_integration()
await hass.async_block_till_done()
# Give it a fixed timestamp so it won't change with every test run
mock_config_entry.data[CONF_TOKEN]["expires_at"] = 1759919745.7328658
assert (
await get_diagnostics_for_config_entry(hass, hass_client, mock_config_entry)
== snapshot
)

View File

@@ -6,7 +6,7 @@ import dataclasses
from datetime import timedelta
import logging
import threading
from typing import Any
from typing import Any, override
from unittest.mock import MagicMock, PropertyMock, patch
from freezegun.api import FrozenDateTimeFactory
@@ -20,6 +20,7 @@ from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_ATTRIBUTION,
ATTR_DEVICE_CLASS,
ATTR_ENTITY_ID,
ATTR_FRIENDLY_NAME,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
@@ -2896,3 +2897,108 @@ async def test_platform_state_write_from_init_unique_id(
# The early attempt to write is interpreted as a unique ID collision
assert "Platform test_platform does not generate unique IDs." in caplog.text
assert "Entity id already exists - ignoring: test.test" not in caplog.text
async def test_included_entities_mixin(
hass: HomeAssistant,
entity_registry: er.EntityRegistry,
) -> None:
"""Test included entities attribute."""
entity_registry.async_get_or_create(
domain="hello",
platform="hello",
unique_id="very_unique_oceans",
suggested_object_id="oceans",
)
entity_registry.async_get_or_create(
domain="hello",
platform="hello",
unique_id="very_unique_continents",
suggested_object_id="continents",
)
entity_registry.async_get_or_create(
domain="hello",
platform="hello",
unique_id="very_unique_moon",
suggested_object_id="moon",
)
class MockHelloBaseClass(entity.Entity):
"""Domain base entity platform domain Hello."""
@property
@override
def state_attributes(self) -> dict[str, Any] | None:
"""Return the state attributes."""
if included_entities := getattr(self, "included_entities", None):
return {ATTR_ENTITY_ID: included_entities}
return None
class MockHelloIncludedEntitiesClass(
MockHelloBaseClass, entity.IncludedEntitiesMixin
):
""".Mock hello grouped entity class for a test integration."""
platform = MockEntityPlatform(hass, domain="hello")
mock_entity = MockHelloIncludedEntitiesClass()
mock_entity.hass = hass
mock_entity.entity_id = "hello.universe"
mock_entity.unique_id = "very_unique_universe"
await platform.async_add_entities([mock_entity])
# Initiate mock grouped entity for hello domain
mock_entity.async_set_included_entities(
"hello", ["very_unique_continents", "very_unique_oceans"]
)
mock_entity.async_schedule_update_ha_state(True)
await hass.async_block_till_done()
state = hass.states.get(mock_entity.entity_id)
assert state.attributes.get(ATTR_ENTITY_ID) == ["hello.continents", "hello.oceans"]
# Add an entity to the group of included entities
mock_entity.async_set_included_entities(
"hello", ["very_unique_continents", "very_unique_moon", "very_unique_oceans"]
)
mock_entity.async_schedule_update_ha_state(True)
await hass.async_block_till_done()
state = hass.states.get(mock_entity.entity_id)
assert state.attributes.get(ATTR_ENTITY_ID) == [
"hello.continents",
"hello.moon",
"hello.oceans",
]
# Remove an entity from the group of included entities
mock_entity.async_set_included_entities(
"hello", ["very_unique_moon", "very_unique_oceans"]
)
mock_entity.async_schedule_update_ha_state(True)
await hass.async_block_till_done()
state = hass.states.get(mock_entity.entity_id)
assert state.attributes.get(ATTR_ENTITY_ID) == ["hello.moon", "hello.oceans"]
# Rename an included entity via the registry entity
entity_registry.async_update_entity(
entity_id="hello.moon", new_entity_id="hello.moon_light"
)
await hass.async_block_till_done()
state = hass.states.get(mock_entity.entity_id)
assert state.attributes.get(ATTR_ENTITY_ID) == ["hello.moon_light", "hello.oceans"]
# Remove an included entity from the registry entity
entity_registry.async_remove(entity_id="hello.oceans")
await hass.async_block_till_done()
state = hass.states.get(mock_entity.entity_id)
assert state.attributes.get(ATTR_ENTITY_ID) == ["hello.moon_light"]