Compare commits

..

18 Commits

Author SHA1 Message Date
abmantis
1bf6771a54 Merge branch 'dev' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-11-06 19:57:40 +00:00
Charlie Rusbridger
eb9849c411 Fix wrong BrowseError module in Kode (#155971) 2025-11-06 19:18:07 +00:00
Joshua Peisach (ItzSwirlz)
93d48fae9d noaa_tides: define constants (#155949) 2025-11-06 19:13:37 +00:00
Matthias Alphart
d90a7b2345 Fix KNX Climate humidity DPT (#155942) 2025-11-06 19:09:51 +00:00
G Johansson
c2f6a364b8 Remove deprecated constant for volt ampere reactive (#155955) 2025-11-06 18:59:11 +00:00
G Johansson
bbadd92ffb Remove deprecated square meters constant (#155954) 2025-11-06 18:58:46 +00:00
Tom Monck JR
6a7de24a04 Fix args passed to check_config script (#155885) 2025-11-06 19:27:53 +02:00
G Johansson
67ccdd36fb Allow template in query in sql (#150287) 2025-11-06 17:11:46 +01:00
Andrea Turri
2ddf55a60d Miele time sensors 3/3 - Add absolute time sensors (#146055)
Co-authored-by: Erik Montnemery <erik@montnemery.com>
2025-11-06 17:09:19 +01:00
abmantis
e7a7cb829e Merge branch 'dev' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-11-04 12:28:39 +00:00
abmantis
6f6b2f1ad3 Merge branch 'dev_target_triggers_conditions' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-10-15 17:03:28 +01:00
abmantis
1cc4890f75 Merge branch 'dev' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-10-15 17:03:18 +01:00
Bram Kragten
d3dd9b26c9 Fixes for triggers.yaml descriptions (#153841)
Co-authored-by: Artur Pragacz <49985303+arturpragacz@users.noreply.github.com>
2025-10-09 18:00:56 +01:00
Abílio Costa
a64d61df05 Fix light trigger with new Trigger class changes (#154087) 2025-10-09 18:14:55 +02:00
abmantis
e7c6c5311d Merge branch 'dev' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-10-09 15:55:39 +01:00
abmantis
72a524c868 Merge branch 'dev' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-09-29 16:56:23 +01:00
abmantis
b437113f31 Merge branch 'dev' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-09-29 11:18:39 +01:00
Abílio Costa
e0e263d3b5 Add state trigger to light component (#148416)
Co-authored-by: Artur Pragacz <49985303+arturpragacz@users.noreply.github.com>
2025-09-18 19:53:26 +01:00
36 changed files with 1697 additions and 235 deletions

View File

@@ -263,9 +263,6 @@ class Panel:
# Title to show in the sidebar
sidebar_title: str | None = None
# If the panel should be visible by default in the sidebar
sidebar_default_visible: bool = True
# Url to show the panel in the frontend
frontend_url_path: str
@@ -283,7 +280,6 @@ class Panel:
component_name: str,
sidebar_title: str | None,
sidebar_icon: str | None,
sidebar_default_visible: bool,
frontend_url_path: str | None,
config: dict[str, Any] | None,
require_admin: bool,
@@ -297,7 +293,6 @@ class Panel:
self.config = config
self.require_admin = require_admin
self.config_panel_domain = config_panel_domain
self.sidebar_default_visible = sidebar_default_visible
@callback
def to_response(self) -> PanelResponse:
@@ -306,7 +301,6 @@ class Panel:
"component_name": self.component_name,
"icon": self.sidebar_icon,
"title": self.sidebar_title,
"default_visible": self.sidebar_default_visible,
"config": self.config,
"url_path": self.frontend_url_path,
"require_admin": self.require_admin,
@@ -321,7 +315,6 @@ def async_register_built_in_panel(
component_name: str,
sidebar_title: str | None = None,
sidebar_icon: str | None = None,
sidebar_default_visible: bool = True,
frontend_url_path: str | None = None,
config: dict[str, Any] | None = None,
require_admin: bool = False,
@@ -334,7 +327,6 @@ def async_register_built_in_panel(
component_name,
sidebar_title,
sidebar_icon,
sidebar_default_visible,
frontend_url_path,
config,
require_admin,
@@ -752,7 +744,9 @@ class ManifestJSONView(HomeAssistantView):
@websocket_api.websocket_command(
{
"type": "frontend/get_icons",
vol.Required("category"): vol.In({"entity", "entity_component", "services"}),
vol.Required("category"): vol.In(
{"entity", "entity_component", "services", "triggers"}
),
vol.Optional("integration"): vol.All(cv.ensure_list, [str]),
}
)
@@ -887,7 +881,6 @@ class PanelResponse(TypedDict):
component_name: str
icon: str | None
title: str | None
default_visible: bool
config: dict[str, Any] | None
url_path: str
require_admin: bool

View File

@@ -359,7 +359,7 @@ CLIMATE_KNX_SCHEMA = vol.Schema(
write=False, state_required=True, valid_dpt="9.001"
),
vol.Optional(CONF_GA_HUMIDITY_CURRENT): GASelector(
write=False, valid_dpt="9.002"
write=False, valid_dpt="9.007"
),
vol.Required(CONF_TARGET_TEMPERATURE): GroupSelect(
GroupSelectOption(

View File

@@ -221,7 +221,7 @@ async def library_payload(hass):
for child in library_info.children:
child.thumbnail = "https://brands.home-assistant.io/_/kodi/logo.png"
with contextlib.suppress(media_source.BrowseError):
with contextlib.suppress(BrowseError):
item = await media_source.async_browse_media(
hass, None, content_filter=media_source_content_filter
)

View File

@@ -25,5 +25,10 @@
"turn_on": {
"service": "mdi:lightbulb-on"
}
},
"triggers": {
"state": {
"trigger": "mdi:state-machine"
}
}
}

View File

@@ -132,6 +132,13 @@
}
},
"selector": {
"behavior": {
"options": {
"any": "Any",
"first": "First",
"last": "Last"
}
},
"color_name": {
"options": {
"aliceblue": "Alice blue",
@@ -289,6 +296,12 @@
"long": "Long",
"short": "Short"
}
},
"state": {
"options": {
"off": "[%key:common::state::off%]",
"on": "[%key:common::state::on%]"
}
}
},
"services": {
@@ -462,5 +475,22 @@
}
}
},
"title": "Light"
"title": "Light",
"triggers": {
"state": {
"description": "When the state of a light changes, such as turning on or off.",
"description_configured": "When the state of a light changes",
"fields": {
"behavior": {
"description": "The behavior of the targeted entities to trigger on.",
"name": "Behavior"
},
"state": {
"description": "The state to trigger on.",
"name": "State"
}
},
"name": "State"
}
}
}

View File

@@ -0,0 +1,152 @@
"""Provides triggers for lights."""
from typing import TYPE_CHECKING, Final, cast, override
import voluptuous as vol
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_STATE,
CONF_TARGET,
STATE_OFF,
STATE_ON,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback, split_entity_id
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.event import process_state_match
from homeassistant.helpers.target import (
TargetStateChangedData,
async_track_target_selector_state_change_event,
)
from homeassistant.helpers.trigger import Trigger, TriggerActionRunner, TriggerConfig
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN
# remove when #151314 is merged
CONF_OPTIONS: Final = "options"
ATTR_BEHAVIOR: Final = "behavior"
BEHAVIOR_FIRST: Final = "first"
BEHAVIOR_LAST: Final = "last"
BEHAVIOR_ANY: Final = "any"
STATE_PLATFORM_TYPE: Final = "state"
STATE_TRIGGER_SCHEMA = vol.Schema(
{
vol.Required(CONF_OPTIONS): {
vol.Required(CONF_STATE): vol.In([STATE_ON, STATE_OFF]),
vol.Required(ATTR_BEHAVIOR, default=BEHAVIOR_ANY): vol.In(
[BEHAVIOR_FIRST, BEHAVIOR_LAST, BEHAVIOR_ANY]
),
},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
class StateTrigger(Trigger):
"""Trigger for state changes."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, STATE_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the state trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
assert config.target is not None
self._options = config.options
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
match_config_state = process_state_match(self._options.get(CONF_STATE))
def check_all_match(entity_ids: set[str]) -> bool:
"""Check if all entity states match."""
return all(
match_config_state(state.state)
for entity_id in entity_ids
if (state := self._hass.states.get(entity_id)) is not None
)
def check_one_match(entity_ids: set[str]) -> bool:
"""Check that only one entity state matches."""
return (
sum(
match_config_state(state.state)
for entity_id in entity_ids
if (state := self._hass.states.get(entity_id)) is not None
)
== 1
)
behavior = self._options.get(ATTR_BEHAVIOR)
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
if to_state is None:
return
# This check is required for "first" behavior, to check that it went from zero
# entities matching the state to one. Otherwise, if previously there were two
# entities on CONF_STATE and one changed, this would trigger.
# For "last" behavior it is not required, but serves as a quicker fail check.
if not match_config_state(to_state.state):
return
if behavior == BEHAVIOR_LAST:
if not check_all_match(target_state_change_data.targeted_entity_ids):
return
elif behavior == BEHAVIOR_FIRST:
if not check_one_match(target_state_change_data.targeted_entity_ids):
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"state of {entity_id}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
TRIGGERS: dict[str, type[Trigger]] = {
STATE_PLATFORM_TYPE: StateTrigger,
}
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
"""Return the triggers for lights."""
return TRIGGERS

View File

@@ -0,0 +1,24 @@
state:
target:
entity:
domain: light
fields:
state:
required: true
default: "on"
selector:
select:
options:
- "off"
- "on"
translation_key: state
behavior:
required: true
default: any
selector:
select:
options:
- first
- last
- any
translation_key: behavior

View File

@@ -41,6 +41,9 @@
"energy_forecast": {
"default": "mdi:lightning-bolt-outline"
},
"finish": {
"default": "mdi:clock-end"
},
"plate": {
"default": "mdi:circle-outline",
"state": {
@@ -83,6 +86,9 @@
"spin_speed": {
"default": "mdi:sync"
},
"start": {
"default": "mdi:clock-start"
},
"start_time": {
"default": "mdi:clock-start"
},

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
from collections.abc import Callable, Mapping
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
from typing import Any, Final, cast
@@ -29,6 +30,7 @@ from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.typing import StateType
from homeassistant.util import dt as dt_util
from .const import (
COFFEE_SYSTEM_PROFILE,
@@ -102,12 +104,47 @@ def _get_coffee_profile(value: MieleDevice) -> str | None:
return None
def _convert_start_timestamp(
elapsed_time_list: list[int], start_time_list: list[int]
) -> datetime | None:
"""Convert raw values representing time into start timestamp."""
now = dt_util.utcnow()
elapsed_duration = _convert_duration(elapsed_time_list)
delayed_start_duration = _convert_duration(start_time_list)
if (elapsed_duration is None or elapsed_duration == 0) and (
delayed_start_duration is None or delayed_start_duration == 0
):
return None
if elapsed_duration is not None and elapsed_duration > 0:
duration = -elapsed_duration
elif delayed_start_duration is not None and delayed_start_duration > 0:
duration = delayed_start_duration
delta = timedelta(minutes=duration)
return (now + delta).replace(second=0, microsecond=0)
def _convert_finish_timestamp(
remaining_time_list: list[int], start_time_list: list[int]
) -> datetime | None:
"""Convert raw values representing time into finish timestamp."""
now = dt_util.utcnow()
program_duration = _convert_duration(remaining_time_list)
delayed_start_duration = _convert_duration(start_time_list)
if program_duration is None or program_duration == 0:
return None
duration = program_duration + (
delayed_start_duration if delayed_start_duration is not None else 0
)
delta = timedelta(minutes=duration)
return (now + delta).replace(second=0, microsecond=0)
@dataclass(frozen=True, kw_only=True)
class MieleSensorDescription(SensorEntityDescription):
"""Class describing Miele sensor entities."""
value_fn: Callable[[MieleDevice], StateType]
end_value_fn: Callable[[StateType], StateType] | None = None
value_fn: Callable[[MieleDevice], StateType | datetime]
end_value_fn: Callable[[StateType | datetime], StateType | datetime] | None = None
extra_attributes: dict[str, Callable[[MieleDevice], StateType]] | None = None
zone: int | None = None
unique_id_fn: Callable[[str, MieleSensorDescription], str] | None = None
@@ -428,6 +465,60 @@ SENSOR_TYPES: Final[tuple[MieleSensorDefinition, ...]] = (
suggested_unit_of_measurement=UnitOfTime.HOURS,
),
),
MieleSensorDefinition(
types=(
MieleAppliance.WASHING_MACHINE,
MieleAppliance.WASHING_MACHINE_SEMI_PROFESSIONAL,
MieleAppliance.TUMBLE_DRYER,
MieleAppliance.TUMBLE_DRYER_SEMI_PROFESSIONAL,
MieleAppliance.DISHWASHER,
MieleAppliance.OVEN,
MieleAppliance.OVEN_MICROWAVE,
MieleAppliance.STEAM_OVEN,
MieleAppliance.MICROWAVE,
MieleAppliance.ROBOT_VACUUM_CLEANER,
MieleAppliance.WASHER_DRYER,
MieleAppliance.STEAM_OVEN_COMBI,
MieleAppliance.STEAM_OVEN_MICRO,
MieleAppliance.DIALOG_OVEN,
MieleAppliance.STEAM_OVEN_MK2,
),
description=MieleSensorDescription(
key="state_finish_timestamp",
translation_key="finish",
value_fn=lambda value: _convert_finish_timestamp(
value.state_remaining_time, value.state_start_time
),
device_class=SensorDeviceClass.TIMESTAMP,
entity_category=EntityCategory.DIAGNOSTIC,
),
),
MieleSensorDefinition(
types=(
MieleAppliance.WASHING_MACHINE,
MieleAppliance.TUMBLE_DRYER,
MieleAppliance.DISHWASHER,
MieleAppliance.OVEN,
MieleAppliance.OVEN_MICROWAVE,
MieleAppliance.STEAM_OVEN,
MieleAppliance.MICROWAVE,
MieleAppliance.WASHER_DRYER,
MieleAppliance.STEAM_OVEN_COMBI,
MieleAppliance.STEAM_OVEN_MICRO,
MieleAppliance.DIALOG_OVEN,
MieleAppliance.ROBOT_VACUUM_CLEANER,
MieleAppliance.STEAM_OVEN_MK2,
),
description=MieleSensorDescription(
key="state_start_timestamp",
translation_key="start",
value_fn=lambda value: _convert_start_timestamp(
value.state_elapsed_time, value.state_start_time
),
device_class=SensorDeviceClass.TIMESTAMP,
entity_category=EntityCategory.DIAGNOSTIC,
),
),
MieleSensorDefinition(
types=(
MieleAppliance.TUMBLE_DRYER_SEMI_PROFESSIONAL,
@@ -620,6 +711,8 @@ async def async_setup_entry(
"state_elapsed_time": MieleTimeSensor,
"state_remaining_time": MieleTimeSensor,
"state_start_time": MieleTimeSensor,
"state_start_timestamp": MieleAbsoluteTimeSensor,
"state_finish_timestamp": MieleAbsoluteTimeSensor,
"current_energy_consumption": MieleConsumptionSensor,
"current_water_consumption": MieleConsumptionSensor,
}.get(definition.description.key, MieleSensor)
@@ -743,7 +836,7 @@ class MieleSensor(MieleEntity, SensorEntity):
self._attr_unique_id = description.unique_id_fn(device_id, description)
@property
def native_value(self) -> StateType:
def native_value(self) -> StateType | datetime:
"""Return the state of the sensor."""
return self.entity_description.value_fn(self.device)
@@ -761,7 +854,7 @@ class MieleSensor(MieleEntity, SensorEntity):
class MieleRestorableSensor(MieleSensor, RestoreSensor):
"""Representation of a Sensor whose internal state can be restored."""
_attr_native_value: StateType
_attr_native_value: StateType | datetime
async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
@@ -773,7 +866,7 @@ class MieleRestorableSensor(MieleSensor, RestoreSensor):
self._attr_native_value = last_data.native_value # type: ignore[assignment]
@property
def native_value(self) -> StateType:
def native_value(self) -> StateType | datetime:
"""Return the state of the sensor.
It is necessary to override `native_value` to fall back to the default
@@ -934,6 +1027,40 @@ class MieleTimeSensor(MieleRestorableSensor):
self._attr_native_value = current_value
class MieleAbsoluteTimeSensor(MieleRestorableSensor):
"""Representation of absolute time sensors handling precision correctness."""
_previous_value: StateType | datetime = None
def _update_native_value(self) -> None:
"""Update the last value of the sensor."""
current_value = self.entity_description.value_fn(self.device)
current_status = StateStatus(self.device.state_status)
# The API reports with minute precision, to avoid changing
# the value too often, we keep the cached value if it differs
# less than 90s from the new value
if (
isinstance(self._previous_value, datetime)
and isinstance(current_value, datetime)
and (
self._previous_value - timedelta(seconds=90)
< current_value
< self._previous_value + timedelta(seconds=90)
)
) or current_status == StateStatus.PROGRAM_ENDED:
return
# force unknown when appliance is not working (some devices are keeping last value until a new cycle starts)
if current_status in (StateStatus.OFF, StateStatus.ON, StateStatus.IDLE):
self._attr_native_value = None
# otherwise, cache value and return it
else:
self._attr_native_value = current_value
self._previous_value = current_value
class MieleConsumptionSensor(MieleRestorableSensor):
"""Representation of consumption sensors keeping state from cache."""

View File

@@ -216,6 +216,9 @@
"energy_forecast": {
"name": "Energy forecast"
},
"finish": {
"name": "Finish"
},
"plate": {
"name": "Plate {plate_no}",
"state": {
@@ -1015,6 +1018,9 @@
"spin_speed": {
"name": "Spin speed"
},
"start": {
"name": "Start"
},
"start_time": {
"name": "Start in"
},

View File

@@ -0,0 +1,11 @@
"""Constants for the NOAA Tides integration."""
from datetime import timedelta
CONF_STATION_ID = "station_id"
DEFAULT_NAME = "NOAA Tides"
DEFAULT_PREDICTION_LENGTH = timedelta(days=2)
DEFAULT_TIMEZONE = "lst_ldt"
ATTRIBUTION = "Data provided by NOAA"

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from datetime import datetime, timedelta
from datetime import datetime
import logging
from typing import TYPE_CHECKING, Any, Literal, TypedDict
@@ -22,6 +22,13 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util.unit_system import METRIC_SYSTEM
from .const import (
ATTRIBUTION,
CONF_STATION_ID,
DEFAULT_NAME,
DEFAULT_PREDICTION_LENGTH,
DEFAULT_TIMEZONE,
)
from .helpers import get_station_unique_id
if TYPE_CHECKING:
@@ -29,13 +36,6 @@ if TYPE_CHECKING:
_LOGGER = logging.getLogger(__name__)
CONF_STATION_ID = "station_id"
DEFAULT_NAME = "NOAA Tides"
DEFAULT_TIMEZONE = "lst_ldt"
SCAN_INTERVAL = timedelta(minutes=60)
TIMEZONES = ["gmt", "lst", "lst_ldt"]
UNIT_SYSTEMS = ["english", "metric"]
@@ -63,9 +63,9 @@ def setup_platform(
if CONF_UNIT_SYSTEM in config:
unit_system = config[CONF_UNIT_SYSTEM]
elif hass.config.units is METRIC_SYSTEM:
unit_system = UNIT_SYSTEMS[1]
unit_system = "metric"
else:
unit_system = UNIT_SYSTEMS[0]
unit_system = "english"
try:
station = coops.Station(station_id, unit_system)
@@ -97,7 +97,7 @@ class NOAATidesData(TypedDict):
class NOAATidesAndCurrentsSensor(SensorEntity):
"""Representation of a NOAA Tides and Currents sensor."""
_attr_attribution = "Data provided by NOAA"
_attr_attribution = ATTRIBUTION
def __init__(self, name, station_id, timezone, unit_system, station) -> None:
"""Initialize the sensor."""
@@ -141,8 +141,8 @@ class NOAATidesAndCurrentsSensor(SensorEntity):
return attr
@property
def native_value(self):
"""Return the state of the device."""
def native_value(self) -> str | None:
"""Return the state."""
if self.data is None:
return None
api_time = self.data["time_stamp"][0]
@@ -157,8 +157,7 @@ class NOAATidesAndCurrentsSensor(SensorEntity):
def update(self) -> None:
"""Get the latest data from NOAA Tides and Currents API."""
begin = datetime.now()
delta = timedelta(days=2)
end = begin + delta
end = begin + DEFAULT_PREDICTION_LENGTH
try:
df_predictions = self._station.get_data(
begin_date=begin.strftime("%Y%m%d %H:%M"),

View File

@@ -49,7 +49,9 @@ QUERY_SCHEMA = vol.Schema(
{
vol.Required(CONF_COLUMN_NAME): cv.string,
vol.Required(CONF_NAME): cv.template,
vol.Required(CONF_QUERY): vol.All(cv.string, validate_sql_select),
vol.Required(CONF_QUERY): vol.All(
cv.template, ValueTemplate.from_template, validate_sql_select
),
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
vol.Optional(CONF_VALUE_TEMPLATE): vol.All(
cv.template, ValueTemplate.from_template

View File

@@ -9,8 +9,6 @@ import sqlalchemy
from sqlalchemy.engine import Engine, Result
from sqlalchemy.exc import MultipleResultsFound, NoSuchColumnError, SQLAlchemyError
from sqlalchemy.orm import Session, scoped_session, sessionmaker
import sqlparse
from sqlparse.exceptions import SQLParseError
import voluptuous as vol
from homeassistant.components.recorder import CONF_DB_URL, get_instance
@@ -31,21 +29,28 @@ from homeassistant.const import (
CONF_UNIT_OF_MEASUREMENT,
CONF_VALUE_TEMPLATE,
)
from homeassistant.core import callback
from homeassistant.core import async_get_hass, callback
from homeassistant.data_entry_flow import section
from homeassistant.exceptions import TemplateError
from homeassistant.helpers import selector
from .const import CONF_ADVANCED_OPTIONS, CONF_COLUMN_NAME, CONF_QUERY, DOMAIN
from .util import resolve_db_url
from .util import (
EmptyQueryError,
InvalidSqlQuery,
MultipleQueryError,
NotSelectQueryError,
UnknownQueryTypeError,
check_and_render_sql_query,
resolve_db_url,
)
_LOGGER = logging.getLogger(__name__)
OPTIONS_SCHEMA: vol.Schema = vol.Schema(
{
vol.Required(CONF_QUERY): selector.TextSelector(
selector.TextSelectorConfig(multiline=True)
),
vol.Required(CONF_QUERY): selector.TemplateSelector(),
vol.Required(CONF_COLUMN_NAME): selector.TextSelector(),
vol.Required(CONF_ADVANCED_OPTIONS): section(
vol.Schema(
@@ -89,14 +94,12 @@ CONFIG_SCHEMA: vol.Schema = vol.Schema(
def validate_sql_select(value: str) -> str:
"""Validate that value is a SQL SELECT query."""
if len(query := sqlparse.parse(value.lstrip().lstrip(";"))) > 1:
raise MultipleResultsFound
if len(query) == 0 or (query_type := query[0].get_type()) == "UNKNOWN":
raise ValueError
if query_type != "SELECT":
_LOGGER.debug("The SQL query %s is of type %s", query, query_type)
raise SQLParseError
return str(query[0])
hass = async_get_hass()
try:
return check_and_render_sql_query(hass, value)
except (TemplateError, InvalidSqlQuery) as err:
_LOGGER.debug("Invalid query '%s' results in '%s'", value, err.args[0])
raise
def validate_db_connection(db_url: str) -> bool:
@@ -138,7 +141,7 @@ def validate_query(db_url: str, query: str, column: str) -> bool:
if sess:
sess.close()
engine.dispose()
raise ValueError(error) from error
raise InvalidSqlQuery from error
for res in result.mappings():
if column not in res:
@@ -224,13 +227,13 @@ class SQLConfigFlow(ConfigFlow, domain=DOMAIN):
except NoSuchColumnError:
errors["column"] = "column_invalid"
description_placeholders = {"column": column}
except MultipleResultsFound:
except (MultipleResultsFound, MultipleQueryError):
errors["query"] = "multiple_queries"
except SQLAlchemyError:
errors["db_url"] = "db_url_invalid"
except SQLParseError:
except (NotSelectQueryError, UnknownQueryTypeError):
errors["query"] = "query_no_read_only"
except ValueError as err:
except (TemplateError, EmptyQueryError, InvalidSqlQuery) as err:
_LOGGER.debug("Invalid query: %s", err)
errors["query"] = "query_invalid"
@@ -282,13 +285,13 @@ class SQLOptionsFlowHandler(OptionsFlowWithReload):
except NoSuchColumnError:
errors["column"] = "column_invalid"
description_placeholders = {"column": column}
except MultipleResultsFound:
except (MultipleResultsFound, MultipleQueryError):
errors["query"] = "multiple_queries"
except SQLAlchemyError:
errors["db_url"] = "db_url_invalid"
except SQLParseError:
except (NotSelectQueryError, UnknownQueryTypeError):
errors["query"] = "query_no_read_only"
except ValueError as err:
except (TemplateError, EmptyQueryError, InvalidSqlQuery) as err:
_LOGGER.debug("Invalid query: %s", err)
errors["query"] = "query_invalid"
else:

View File

@@ -22,7 +22,7 @@ from homeassistant.const import (
MATCH_ALL,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import TemplateError
from homeassistant.exceptions import PlatformNotReady, TemplateError
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import (
AddConfigEntryEntitiesCallback,
@@ -40,7 +40,9 @@ from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import CONF_ADVANCED_OPTIONS, CONF_COLUMN_NAME, CONF_QUERY, DOMAIN
from .util import (
InvalidSqlQuery,
async_create_sessionmaker,
check_and_render_sql_query,
convert_value,
generate_lambda_stmt,
redact_credentials,
@@ -81,7 +83,7 @@ async def async_setup_platform(
return
name: Template = conf[CONF_NAME]
query_str: str = conf[CONF_QUERY]
query_template: ValueTemplate = conf[CONF_QUERY]
value_template: ValueTemplate | None = conf.get(CONF_VALUE_TEMPLATE)
column_name: str = conf[CONF_COLUMN_NAME]
unique_id: str | None = conf.get(CONF_UNIQUE_ID)
@@ -96,7 +98,7 @@ async def async_setup_platform(
await async_setup_sensor(
hass,
trigger_entity_config,
query_str,
query_template,
column_name,
value_template,
unique_id,
@@ -119,6 +121,13 @@ async def async_setup_entry(
template: str | None = entry.options[CONF_ADVANCED_OPTIONS].get(CONF_VALUE_TEMPLATE)
column_name: str = entry.options[CONF_COLUMN_NAME]
query_template: ValueTemplate | None = None
try:
query_template = ValueTemplate(query_str, hass)
query_template.ensure_valid()
except TemplateError as err:
raise PlatformNotReady("Invalid SQL query template") from err
value_template: ValueTemplate | None = None
if template is not None:
try:
@@ -137,7 +146,7 @@ async def async_setup_entry(
await async_setup_sensor(
hass,
trigger_entity_config,
query_str,
query_template,
column_name,
value_template,
entry.entry_id,
@@ -150,7 +159,7 @@ async def async_setup_entry(
async def async_setup_sensor(
hass: HomeAssistant,
trigger_entity_config: ConfigType,
query_str: str,
query_template: ValueTemplate,
column_name: str,
value_template: ValueTemplate | None,
unique_id: str | None,
@@ -166,22 +175,25 @@ async def async_setup_sensor(
) = await async_create_sessionmaker(hass, db_url)
if sessmaker is None:
return
validate_query(hass, query_str, uses_recorder_db, unique_id)
validate_query(hass, query_template, uses_recorder_db, unique_id)
query_str = check_and_render_sql_query(hass, query_template)
upper_query = query_str.upper()
# MSSQL uses TOP and not LIMIT
mod_query_template = query_template
if not ("LIMIT" in upper_query or "SELECT TOP" in upper_query):
if "mssql" in db_url:
query_str = upper_query.replace("SELECT", "SELECT TOP 1")
_query = query_template.template.replace("SELECT", "SELECT TOP 1")
else:
query_str = query_str.replace(";", "") + " LIMIT 1;"
_query = query_template.template.replace(";", "") + " LIMIT 1;"
mod_query_template = ValueTemplate(_query, hass)
async_add_entities(
[
SQLSensor(
trigger_entity_config,
sessmaker,
query_str,
mod_query_template,
column_name,
value_template,
yaml,
@@ -200,7 +212,7 @@ class SQLSensor(ManualTriggerSensorEntity):
self,
trigger_entity_config: ConfigType,
sessmaker: scoped_session,
query: str,
query: ValueTemplate,
column: str,
value_template: ValueTemplate | None,
yaml: bool,
@@ -214,7 +226,6 @@ class SQLSensor(ManualTriggerSensorEntity):
self.sessionmaker = sessmaker
self._attr_extra_state_attributes = {}
self._use_database_executor = use_database_executor
self._lambda_stmt = generate_lambda_stmt(query)
if not yaml and (unique_id := trigger_entity_config.get(CONF_UNIQUE_ID)):
self._attr_name = None
self._attr_has_entity_name = True
@@ -255,11 +266,22 @@ class SQLSensor(ManualTriggerSensorEntity):
self._attr_extra_state_attributes = {}
sess: scoped_session = self.sessionmaker()
try:
result: Result = sess.execute(self._lambda_stmt)
rendered_query = check_and_render_sql_query(self.hass, self._query)
_lambda_stmt = generate_lambda_stmt(rendered_query)
result: Result = sess.execute(_lambda_stmt)
except (TemplateError, InvalidSqlQuery) as err:
_LOGGER.error(
"Error rendering query %s: %s",
redact_credentials(self._query.template),
redact_credentials(str(err)),
)
sess.rollback()
sess.close()
return
except SQLAlchemyError as err:
_LOGGER.error(
"Error executing query %s: %s",
self._query,
rendered_query,
redact_credentials(str(err)),
)
sess.rollback()
@@ -267,7 +289,7 @@ class SQLSensor(ManualTriggerSensorEntity):
return
for res in result.mappings():
_LOGGER.debug("Query %s result in %s", self._query, res.items())
_LOGGER.debug("Query %s result in %s", rendered_query, res.items())
data = res[self._column_name]
for key, value in res.items():
self._attr_extra_state_attributes[key] = convert_value(value)
@@ -287,6 +309,6 @@ class SQLSensor(ManualTriggerSensorEntity):
self._attr_native_value = data
if data is None:
_LOGGER.warning("%s returned no results", self._query)
_LOGGER.warning("%s returned no results", rendered_query)
sess.close()

View File

@@ -19,11 +19,13 @@ from homeassistant.core import (
)
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.trigger_template_entity import ValueTemplate
from homeassistant.util.json import JsonValueType
from .const import CONF_QUERY, DOMAIN
from .util import (
async_create_sessionmaker,
check_and_render_sql_query,
convert_value,
generate_lambda_stmt,
redact_credentials,
@@ -37,7 +39,9 @@ _LOGGER = logging.getLogger(__name__)
SERVICE_QUERY = "query"
SERVICE_QUERY_SCHEMA = vol.Schema(
{
vol.Required(CONF_QUERY): vol.All(cv.string, validate_sql_select),
vol.Required(CONF_QUERY): vol.All(
cv.template, ValueTemplate.from_template, validate_sql_select
),
vol.Optional(CONF_DB_URL): cv.string,
}
)
@@ -72,8 +76,9 @@ async def _async_query_service(
def _execute_and_convert_query() -> list[JsonValueType]:
"""Execute the query and return the results with converted types."""
sess: Session = sessmaker()
rendered_query = check_and_render_sql_query(call.hass, query_str)
try:
result: Result = sess.execute(generate_lambda_stmt(query_str))
result: Result = sess.execute(generate_lambda_stmt(rendered_query))
except SQLAlchemyError as err:
_LOGGER.debug(
"Error executing query %s: %s",

View File

@@ -8,7 +8,7 @@
"db_url_invalid": "Database URL invalid",
"multiple_queries": "Multiple SQL queries are not supported",
"query_invalid": "SQL query invalid",
"query_no_read_only": "SQL query must be read-only"
"query_no_read_only": "SQL query is not a read-only SELECT query or it's of an unknown type"
},
"step": {
"options": {

View File

@@ -19,7 +19,9 @@ import voluptuous as vol
from homeassistant.components.recorder import SupportedDialect, get_instance
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError, TemplateError
from homeassistant.helpers import issue_registry as ir
from homeassistant.helpers.template import Template
from .const import DB_URL_RE, DOMAIN
from .models import SQLData
@@ -44,16 +46,14 @@ def resolve_db_url(hass: HomeAssistant, db_url: str | None) -> str:
return get_instance(hass).db_url
def validate_sql_select(value: str) -> str:
def validate_sql_select(value: Template) -> Template:
"""Validate that value is a SQL SELECT query."""
if len(query := sqlparse.parse(value.lstrip().lstrip(";"))) > 1:
raise vol.Invalid("Multiple SQL queries are not supported")
if len(query) == 0 or (query_type := query[0].get_type()) == "UNKNOWN":
raise vol.Invalid("Invalid SQL query")
if query_type != "SELECT":
_LOGGER.debug("The SQL query %s is of type %s", query, query_type)
raise vol.Invalid("Only SELECT queries allowed")
return str(query[0])
try:
assert value.hass
check_and_render_sql_query(value.hass, value)
except (TemplateError, InvalidSqlQuery) as err:
raise vol.Invalid(str(err)) from err
return value
async def async_create_sessionmaker(
@@ -113,7 +113,7 @@ async def async_create_sessionmaker(
def validate_query(
hass: HomeAssistant,
query_str: str,
query_template: str | Template,
uses_recorder_db: bool,
unique_id: str | None = None,
) -> None:
@@ -121,7 +121,7 @@ def validate_query(
Args:
hass: The Home Assistant instance.
query_str: The SQL query string to be validated.
query_template: The SQL query string to be validated.
uses_recorder_db: A boolean indicating if the query is against the recorder database.
unique_id: The unique ID of the entity, used for creating issue registry keys.
@@ -131,6 +131,10 @@ def validate_query(
"""
if not uses_recorder_db:
return
if isinstance(query_template, Template):
query_str = query_template.async_render()
else:
query_str = Template(query_template, hass).async_render()
redacted_query = redact_credentials(query_str)
issue_key = unique_id if unique_id else redacted_query
@@ -239,3 +243,49 @@ def convert_value(value: Any) -> Any:
return f"0x{value.hex()}"
case _:
return value
def check_and_render_sql_query(hass: HomeAssistant, query: Template | str) -> str:
"""Check and render SQL query."""
if isinstance(query, str):
query = query.strip()
if not query:
raise EmptyQueryError("Query cannot be empty")
query = Template(query, hass=hass)
# Raises TemplateError if template is invalid
query.ensure_valid()
rendered_query: str = query.async_render()
if len(rendered_queries := sqlparse.parse(rendered_query.lstrip().lstrip(";"))) > 1:
raise MultipleQueryError("Multiple SQL statements are not allowed")
if (
len(rendered_queries) == 0
or (query_type := rendered_queries[0].get_type()) == "UNKNOWN"
):
raise UnknownQueryTypeError("SQL query is empty or unknown type")
if query_type != "SELECT":
_LOGGER.debug("The SQL query %s is of type %s", rendered_query, query_type)
raise NotSelectQueryError("SQL query must be of type SELECT")
return str(rendered_queries[0])
class InvalidSqlQuery(HomeAssistantError):
"""SQL query is invalid error."""
class EmptyQueryError(InvalidSqlQuery):
"""SQL query is empty error."""
class MultipleQueryError(InvalidSqlQuery):
"""SQL query is multiple error."""
class UnknownQueryTypeError(InvalidSqlQuery):
"""SQL query is of unknown type error."""
class NotSelectQueryError(InvalidSqlQuery):
"""SQL query is not a SELECT statement error."""

View File

@@ -487,13 +487,6 @@ class UnitOfReactivePower(StrEnum):
KILO_VOLT_AMPERE_REACTIVE = "kvar"
_DEPRECATED_POWER_VOLT_AMPERE_REACTIVE: Final = DeprecatedConstantEnum(
UnitOfReactivePower.VOLT_AMPERE_REACTIVE,
"2025.9",
)
"""Deprecated: please use UnitOfReactivePower.VOLT_AMPERE_REACTIVE."""
# Energy units
class UnitOfEnergy(StrEnum):
"""Energy units."""
@@ -685,13 +678,6 @@ class UnitOfArea(StrEnum):
HECTARES = "ha"
_DEPRECATED_AREA_SQUARE_METERS: Final = DeprecatedConstantEnum(
UnitOfArea.SQUARE_METERS,
"2025.12",
)
"""Deprecated: please use UnitOfArea.SQUARE_METERS"""
# Mass units
class UnitOfMass(StrEnum):
"""Mass units."""

View File

@@ -806,6 +806,9 @@ async def async_get_all_descriptions(
description = {"fields": yaml_description.get("fields", {})}
if (target := yaml_description.get("target")) is not None:
description["target"] = target
new_descriptions_cache[missing_trigger] = description
hass.data[TRIGGER_DESCRIPTION_CACHE] = new_descriptions_cache

View File

@@ -90,7 +90,9 @@ def run(script_args: list) -> int:
help="Exit non-zero if warnings are present",
)
args, unknown = parser.parse_known_args(script_args)
# Parse all args including --config & --script. Do not use script_args.
# Example: python -m homeassistant --config "." --script check_config
args, unknown = parser.parse_known_args()
if unknown:
print(color("red", "Unknown arguments:", ", ".join(unknown)))

View File

@@ -645,7 +645,6 @@ async def test_get_panels(
assert msg["result"]["map"]["icon"] == "mdi:tooltip-account"
assert msg["result"]["map"]["title"] == "Map"
assert msg["result"]["map"]["require_admin"] is True
assert msg["result"]["map"]["default_visible"] is True
async_remove_panel(hass, "map")
@@ -686,45 +685,6 @@ async def test_get_panels_non_admin(
assert "map" not in msg["result"]
async def test_panel_sidebar_default_visible(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_http_client: TestClient,
) -> None:
"""Test sidebar_default_visible property in panels."""
async_register_built_in_panel(
hass,
"default_panel",
"Default Panel",
)
async_register_built_in_panel(
hass,
"visible_panel",
"Visible Panel",
"mdi:eye",
sidebar_default_visible=True,
)
async_register_built_in_panel(
hass,
"hidden_panel",
"Hidden Panel",
"mdi:eye-off",
sidebar_default_visible=False,
)
client = await hass_ws_client(hass)
await client.send_json({"id": 5, "type": "get_panels"})
msg = await client.receive_json()
assert msg["id"] == 5
assert msg["type"] == TYPE_RESULT
assert msg["success"]
assert msg["result"]["default_panel"]["default_visible"] is True
assert msg["result"]["visible_panel"]["default_visible"] is True
assert msg["result"]["hidden_panel"]["default_visible"] is False
async def test_get_translations(ws_client: MockHAClientWebSocket) -> None:
"""Test get_translations command."""
with patch(

View File

@@ -253,7 +253,9 @@ async def test_setup_api_panel(
"component_name": "custom",
"icon": None,
"title": None,
"default_visible": True,
"url_path": "hassio",
"require_admin": True,
"config_panel_domain": None,
"config": {
"_panel_custom": {
"embed_iframe": True,
@@ -262,9 +264,6 @@ async def test_setup_api_panel(
"trust_external": False,
}
},
"url_path": "hassio",
"require_admin": True,
"config_panel_domain": None,
}

View File

@@ -122,7 +122,7 @@
'validDPTs': list([
dict({
'main': 9,
'sub': 2,
'sub': 7,
}),
]),
'write': False,

View File

@@ -0,0 +1,283 @@
"""Test light trigger."""
import pytest
from homeassistant.components import automation
from homeassistant.const import (
ATTR_AREA_ID,
ATTR_DEVICE_ID,
ATTR_FLOOR_ID,
ATTR_LABEL_ID,
CONF_ENTITY_ID,
CONF_PLATFORM,
CONF_STATE,
CONF_TARGET,
STATE_OFF,
STATE_ON,
)
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.helpers import (
area_registry as ar,
device_registry as dr,
entity_registry as er,
floor_registry as fr,
label_registry as lr,
)
from homeassistant.setup import async_setup_component
from tests.common import MockConfigEntry, mock_device_registry
# remove when #151314 is merged
CONF_OPTIONS = "options"
@pytest.fixture(autouse=True, name="stub_blueprint_populate")
def stub_blueprint_populate_autouse(stub_blueprint_populate: None) -> None:
"""Stub copying the blueprints to the config folder."""
@pytest.fixture
async def target_lights(hass: HomeAssistant) -> None:
"""Create multiple light entities associated with different targets."""
await async_setup_component(hass, "light", {})
config_entry = MockConfigEntry(domain="test")
config_entry.add_to_hass(hass)
floor_reg = fr.async_get(hass)
floor = floor_reg.async_create("Test Floor")
area_reg = ar.async_get(hass)
area = area_reg.async_create("Test Area", floor_id=floor.floor_id)
label_reg = lr.async_get(hass)
label = label_reg.async_create("Test Label")
device = dr.DeviceEntry(id="test_device", area_id=area.id, labels={label.label_id})
mock_device_registry(hass, {device.id: device})
entity_reg = er.async_get(hass)
# Light associated with area
light_area = entity_reg.async_get_or_create(
domain="light",
platform="test",
unique_id="light_area",
suggested_object_id="area_light",
)
entity_reg.async_update_entity(light_area.entity_id, area_id=area.id)
# Light associated with device
entity_reg.async_get_or_create(
domain="light",
platform="test",
unique_id="light_device",
suggested_object_id="device_light",
device_id=device.id,
)
# Light associated with label
light_label = entity_reg.async_get_or_create(
domain="light",
platform="test",
unique_id="light_label",
suggested_object_id="label_light",
)
entity_reg.async_update_entity(light_label.entity_id, labels={label.label_id})
# Return all available light entities
return [
"light.standalone_light",
"light.label_light",
"light.area_light",
"light.device_light",
]
@pytest.mark.usefixtures("target_lights")
@pytest.mark.parametrize(
("trigger_target_config", "entity_id"),
[
({CONF_ENTITY_ID: "light.standalone_light"}, "light.standalone_light"),
({ATTR_LABEL_ID: "test_label"}, "light.label_light"),
({ATTR_AREA_ID: "test_area"}, "light.area_light"),
({ATTR_FLOOR_ID: "test_floor"}, "light.area_light"),
({ATTR_LABEL_ID: "test_label"}, "light.device_light"),
({ATTR_AREA_ID: "test_area"}, "light.device_light"),
({ATTR_FLOOR_ID: "test_floor"}, "light.device_light"),
({ATTR_DEVICE_ID: "test_device"}, "light.device_light"),
],
)
@pytest.mark.parametrize(
("state", "reverse_state"), [(STATE_ON, STATE_OFF), (STATE_OFF, STATE_ON)]
)
async def test_light_state_trigger_behavior_any(
hass: HomeAssistant,
service_calls: list[ServiceCall],
trigger_target_config: dict,
entity_id: str,
state: str,
reverse_state: str,
) -> None:
"""Test that the light state trigger fires when any light state changes to a specific state."""
await async_setup_component(hass, "light", {})
hass.states.async_set(entity_id, reverse_state)
await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
CONF_PLATFORM: "light.state",
CONF_TARGET: {**trigger_target_config},
CONF_OPTIONS: {CONF_STATE: state},
},
"action": {
"service": "test.automation",
"data_template": {CONF_ENTITY_ID: f"{entity_id}"},
},
}
},
)
hass.states.async_set(entity_id, state)
await hass.async_block_till_done()
assert len(service_calls) == 1
assert service_calls[0].data[CONF_ENTITY_ID] == entity_id
service_calls.clear()
hass.states.async_set(entity_id, reverse_state)
await hass.async_block_till_done()
assert len(service_calls) == 0
@pytest.mark.parametrize(
("trigger_target_config", "entity_id"),
[
({CONF_ENTITY_ID: "light.standalone_light"}, "light.standalone_light"),
({ATTR_LABEL_ID: "test_label"}, "light.label_light"),
({ATTR_AREA_ID: "test_area"}, "light.area_light"),
({ATTR_FLOOR_ID: "test_floor"}, "light.area_light"),
({ATTR_LABEL_ID: "test_label"}, "light.device_light"),
({ATTR_AREA_ID: "test_area"}, "light.device_light"),
({ATTR_FLOOR_ID: "test_floor"}, "light.device_light"),
({ATTR_DEVICE_ID: "test_device"}, "light.device_light"),
],
)
@pytest.mark.parametrize(
("state", "reverse_state"), [(STATE_ON, STATE_OFF), (STATE_OFF, STATE_ON)]
)
async def test_light_state_trigger_behavior_first(
hass: HomeAssistant,
service_calls: list[ServiceCall],
target_lights: list[str],
trigger_target_config: dict,
entity_id: str,
state: str,
reverse_state: str,
) -> None:
"""Test that the light state trigger fires when the first light changes to a specific state."""
await async_setup_component(hass, "light", {})
for other_entity_id in target_lights:
hass.states.async_set(other_entity_id, reverse_state)
await hass.async_block_till_done()
await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
CONF_PLATFORM: "light.state",
CONF_TARGET: {**trigger_target_config},
CONF_OPTIONS: {CONF_STATE: state, "behavior": "first"},
},
"action": {
"service": "test.automation",
"data_template": {CONF_ENTITY_ID: f"{entity_id}"},
},
}
},
)
hass.states.async_set(entity_id, state)
await hass.async_block_till_done()
assert len(service_calls) == 1
assert service_calls[0].data[CONF_ENTITY_ID] == entity_id
service_calls.clear()
# Triggering other lights should not cause any service calls after the first one
for other_entity_id in target_lights:
hass.states.async_set(other_entity_id, state)
await hass.async_block_till_done()
for other_entity_id in target_lights:
hass.states.async_set(other_entity_id, reverse_state)
await hass.async_block_till_done()
assert len(service_calls) == 0
hass.states.async_set(entity_id, state)
await hass.async_block_till_done()
assert len(service_calls) == 1
assert service_calls[0].data[CONF_ENTITY_ID] == entity_id
@pytest.mark.parametrize(
("trigger_target_config", "entity_id"),
[
({CONF_ENTITY_ID: "light.standalone_light"}, "light.standalone_light"),
({ATTR_LABEL_ID: "test_label"}, "light.label_light"),
({ATTR_AREA_ID: "test_area"}, "light.area_light"),
({ATTR_FLOOR_ID: "test_floor"}, "light.area_light"),
({ATTR_LABEL_ID: "test_label"}, "light.device_light"),
({ATTR_AREA_ID: "test_area"}, "light.device_light"),
({ATTR_FLOOR_ID: "test_floor"}, "light.device_light"),
({ATTR_DEVICE_ID: "test_device"}, "light.device_light"),
],
)
@pytest.mark.parametrize(
("state", "reverse_state"), [(STATE_ON, STATE_OFF), (STATE_OFF, STATE_ON)]
)
async def test_light_state_trigger_behavior_last(
hass: HomeAssistant,
service_calls: list[ServiceCall],
target_lights: list[str],
trigger_target_config: dict,
entity_id: str,
state: str,
reverse_state: str,
) -> None:
"""Test that the light state trigger fires when the last light changes to a specific state."""
await async_setup_component(hass, "light", {})
for other_entity_id in target_lights:
hass.states.async_set(other_entity_id, reverse_state)
await hass.async_block_till_done()
await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
CONF_PLATFORM: "light.state",
CONF_TARGET: {**trigger_target_config},
CONF_OPTIONS: {CONF_STATE: state, "behavior": "last"},
},
"action": {
"service": "test.automation",
"data_template": {CONF_ENTITY_ID: f"{entity_id}"},
},
}
},
)
target_lights.remove(entity_id)
for other_entity_id in target_lights:
hass.states.async_set(other_entity_id, state)
await hass.async_block_till_done()
assert len(service_calls) == 0
hass.states.async_set(entity_id, state)
await hass.async_block_till_done()
assert len(service_calls) == 1

View File

@@ -2873,6 +2873,55 @@
'state': 'unknown',
})
# ---
# name: test_sensor_states[platforms0][sensor.oven_finish-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.oven_finish',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': <SensorDeviceClass.TIMESTAMP: 'timestamp'>,
'original_icon': None,
'original_name': 'Finish',
'platform': 'miele',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'finish',
'unique_id': 'DummyAppliance_12-state_finish_timestamp',
'unit_of_measurement': None,
})
# ---
# name: test_sensor_states[platforms0][sensor.oven_finish-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'timestamp',
'friendly_name': 'Oven Finish',
}),
'context': <ANY>,
'entity_id': 'sensor.oven_finish',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'unknown',
})
# ---
# name: test_sensor_states[platforms0][sensor.oven_program-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
@@ -3422,6 +3471,55 @@
'state': 'unknown',
})
# ---
# name: test_sensor_states[platforms0][sensor.oven_start-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.oven_start',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': <SensorDeviceClass.TIMESTAMP: 'timestamp'>,
'original_icon': None,
'original_name': 'Start',
'platform': 'miele',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'start',
'unique_id': 'DummyAppliance_12-state_start_timestamp',
'unit_of_measurement': None,
})
# ---
# name: test_sensor_states[platforms0][sensor.oven_start-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'timestamp',
'friendly_name': 'Oven Start',
}),
'context': <ANY>,
'entity_id': 'sensor.oven_start',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'unknown',
})
# ---
# name: test_sensor_states[platforms0][sensor.oven_start_in-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
@@ -3986,6 +4084,55 @@
'state': '10.0',
})
# ---
# name: test_sensor_states[platforms0][sensor.washing_machine_finish-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.washing_machine_finish',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': <SensorDeviceClass.TIMESTAMP: 'timestamp'>,
'original_icon': None,
'original_name': 'Finish',
'platform': 'miele',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'finish',
'unique_id': 'Dummy_Appliance_3-state_finish_timestamp',
'unit_of_measurement': None,
})
# ---
# name: test_sensor_states[platforms0][sensor.washing_machine_finish-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'timestamp',
'friendly_name': 'Washing machine Finish',
}),
'context': <ANY>,
'entity_id': 'sensor.washing_machine_finish',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'unknown',
})
# ---
# name: test_sensor_states[platforms0][sensor.washing_machine_program-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
@@ -4366,6 +4513,55 @@
'state': 'unknown',
})
# ---
# name: test_sensor_states[platforms0][sensor.washing_machine_start-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.washing_machine_start',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': <SensorDeviceClass.TIMESTAMP: 'timestamp'>,
'original_icon': None,
'original_name': 'Start',
'platform': 'miele',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'start',
'unique_id': 'Dummy_Appliance_3-state_start_timestamp',
'unit_of_measurement': None,
})
# ---
# name: test_sensor_states[platforms0][sensor.washing_machine_start-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'timestamp',
'friendly_name': 'Washing machine Start',
}),
'context': <ANY>,
'entity_id': 'sensor.washing_machine_start',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'unknown',
})
# ---
# name: test_sensor_states[platforms0][sensor.washing_machine_start_in-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
@@ -5021,6 +5217,55 @@
'state': '0',
})
# ---
# name: test_sensor_states_api_push[platforms0][sensor.oven_finish-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.oven_finish',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': <SensorDeviceClass.TIMESTAMP: 'timestamp'>,
'original_icon': None,
'original_name': 'Finish',
'platform': 'miele',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'finish',
'unique_id': 'DummyAppliance_12-state_finish_timestamp',
'unit_of_measurement': None,
})
# ---
# name: test_sensor_states_api_push[platforms0][sensor.oven_finish-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'timestamp',
'friendly_name': 'Oven Finish',
}),
'context': <ANY>,
'entity_id': 'sensor.oven_finish',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '2025-05-31T12:35:00+00:00',
})
# ---
# name: test_sensor_states_api_push[platforms0][sensor.oven_program-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
@@ -5570,6 +5815,55 @@
'state': '5',
})
# ---
# name: test_sensor_states_api_push[platforms0][sensor.oven_start-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.oven_start',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': <SensorDeviceClass.TIMESTAMP: 'timestamp'>,
'original_icon': None,
'original_name': 'Start',
'platform': 'miele',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'start',
'unique_id': 'DummyAppliance_12-state_start_timestamp',
'unit_of_measurement': None,
})
# ---
# name: test_sensor_states_api_push[platforms0][sensor.oven_start-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'timestamp',
'friendly_name': 'Oven Start',
}),
'context': <ANY>,
'entity_id': 'sensor.oven_start',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'unknown',
})
# ---
# name: test_sensor_states_api_push[platforms0][sensor.oven_start_in-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
@@ -6134,6 +6428,55 @@
'state': '10.0',
})
# ---
# name: test_sensor_states_api_push[platforms0][sensor.washing_machine_finish-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.washing_machine_finish',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': <SensorDeviceClass.TIMESTAMP: 'timestamp'>,
'original_icon': None,
'original_name': 'Finish',
'platform': 'miele',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'finish',
'unique_id': 'Dummy_Appliance_3-state_finish_timestamp',
'unit_of_measurement': None,
})
# ---
# name: test_sensor_states_api_push[platforms0][sensor.washing_machine_finish-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'timestamp',
'friendly_name': 'Washing machine Finish',
}),
'context': <ANY>,
'entity_id': 'sensor.washing_machine_finish',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'unknown',
})
# ---
# name: test_sensor_states_api_push[platforms0][sensor.washing_machine_program-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
@@ -6514,6 +6857,55 @@
'state': 'unknown',
})
# ---
# name: test_sensor_states_api_push[platforms0][sensor.washing_machine_start-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.washing_machine_start',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': <SensorDeviceClass.TIMESTAMP: 'timestamp'>,
'original_icon': None,
'original_name': 'Start',
'platform': 'miele',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'start',
'unique_id': 'Dummy_Appliance_3-state_start_timestamp',
'unit_of_measurement': None,
})
# ---
# name: test_sensor_states_api_push[platforms0][sensor.washing_machine_start-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'timestamp',
'friendly_name': 'Washing machine Start',
}),
'context': <ANY>,
'entity_id': 'sensor.washing_machine_start',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'unknown',
})
# ---
# name: test_sensor_states_api_push[platforms0][sensor.washing_machine_start_in-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
@@ -6925,6 +7317,55 @@
'state': 'unknown',
})
# ---
# name: test_vacuum_sensor_states[platforms0-vacuum_device.json][sensor.robot_vacuum_cleaner_finish-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.robot_vacuum_cleaner_finish',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': <SensorDeviceClass.TIMESTAMP: 'timestamp'>,
'original_icon': None,
'original_name': 'Finish',
'platform': 'miele',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'finish',
'unique_id': 'Dummy_Vacuum_1-state_finish_timestamp',
'unit_of_measurement': None,
})
# ---
# name: test_vacuum_sensor_states[platforms0-vacuum_device.json][sensor.robot_vacuum_cleaner_finish-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'timestamp',
'friendly_name': 'Robot vacuum cleaner Finish',
}),
'context': <ANY>,
'entity_id': 'sensor.robot_vacuum_cleaner_finish',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'unknown',
})
# ---
# name: test_vacuum_sensor_states[platforms0-vacuum_device.json][sensor.robot_vacuum_cleaner_program-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
@@ -7106,3 +7547,52 @@
'state': 'unknown',
})
# ---
# name: test_vacuum_sensor_states[platforms0-vacuum_device.json][sensor.robot_vacuum_cleaner_start-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.robot_vacuum_cleaner_start',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': <SensorDeviceClass.TIMESTAMP: 'timestamp'>,
'original_icon': None,
'original_name': 'Start',
'platform': 'miele',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'start',
'unique_id': 'Dummy_Vacuum_1-state_start_timestamp',
'unit_of_measurement': None,
})
# ---
# name: test_vacuum_sensor_states[platforms0-vacuum_device.json][sensor.robot_vacuum_cleaner_start-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'timestamp',
'friendly_name': 'Robot vacuum cleaner Start',
}),
'context': <ANY>,
'entity_id': 'sensor.robot_vacuum_cleaner_start',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'unknown',
})
# ---

View File

@@ -1,6 +1,6 @@
"""Tests for miele sensor module."""
from datetime import timedelta
from datetime import UTC, datetime, timedelta
from unittest.mock import MagicMock
from freezegun.api import FrozenDateTimeFactory
@@ -23,6 +23,7 @@ from tests.common import (
)
@pytest.mark.freeze_time("2025-05-31 12:30:00+00:00")
@pytest.mark.parametrize("platforms", [(SENSOR_DOMAIN,)])
@pytest.mark.usefixtures("entity_registry_enabled_by_default")
async def test_sensor_states(
@@ -37,6 +38,7 @@ async def test_sensor_states(
await snapshot_platform(hass, entity_registry, snapshot, setup_platform.entry_id)
@pytest.mark.freeze_time("2025-05-31 12:30:00+00:00")
@pytest.mark.parametrize("platforms", [(SENSOR_DOMAIN,)])
@pytest.mark.usefixtures("entity_registry_enabled_by_default")
async def test_sensor_states_api_push(
@@ -302,6 +304,7 @@ async def test_laundry_wash_scenario(
"""Parametrized test for verifying time sensors for wahsing machine devices when API glitches at program end."""
step = 0
freezer.move_to("2025-05-31T12:00:00+00:00")
# Initial state when the washing machine is off
check_sensor_state(hass, "sensor.washing_machine", "off", step)
@@ -317,6 +320,8 @@ async def test_laundry_wash_scenario(
check_sensor_state(hass, "sensor.washing_machine_remaining_time", "unknown", step)
# OFF -> elapsed forced to unknown (some devices continue reporting last value of last cycle)
check_sensor_state(hass, "sensor.washing_machine_elapsed_time", "unknown", step)
check_sensor_state(hass, "sensor.washing_machine_start", "unknown", step)
check_sensor_state(hass, "sensor.washing_machine_finish", "unknown", step)
# consumption sensors have to report "unknown" when the device is not working
check_sensor_state(
hass, "sensor.washing_machine_energy_consumption", "unknown", step
@@ -357,7 +362,7 @@ async def test_laundry_wash_scenario(
},
}
freezer.tick(timedelta(seconds=130))
freezer.move_to("2025-05-31T12:30:00+00:00")
async_fire_time_changed(hass)
await hass.async_block_till_done()
@@ -376,8 +381,12 @@ async def test_laundry_wash_scenario(
"unit": "l",
},
}
device_fixture["DummyWasher"]["state"]["elapsedTime"][0] = 0
device_fixture["DummyWasher"]["state"]["elapsedTime"][1] = 14
device_fixture["DummyWasher"]["state"]["remainingTime"][0] = 1
device_fixture["DummyWasher"]["state"]["remainingTime"][1] = 43
freezer.tick(timedelta(seconds=130))
freezer.move_to("2025-05-31T12:32:00+00:00")
async_fire_time_changed(hass)
await hass.async_block_till_done()
@@ -389,8 +398,14 @@ async def test_laundry_wash_scenario(
check_sensor_state(hass, "sensor.washing_machine_target_temperature", "30.0", step)
check_sensor_state(hass, "sensor.washing_machine_spin_speed", "1200", step)
# IN_USE -> elapsed, remaining time from API (normal case)
check_sensor_state(hass, "sensor.washing_machine_remaining_time", "105", step)
check_sensor_state(hass, "sensor.washing_machine_elapsed_time", "12", step)
check_sensor_state(hass, "sensor.washing_machine_remaining_time", "103", step)
check_sensor_state(hass, "sensor.washing_machine_elapsed_time", "14", step)
check_sensor_state(
hass, "sensor.washing_machine_start", "2025-05-31T12:18:00+00:00", step
)
check_sensor_state(
hass, "sensor.washing_machine_finish", "2025-05-31T14:15:00+00:00", step
)
check_sensor_state(hass, "sensor.washing_machine_energy_consumption", "0.0", step)
check_sensor_state(hass, "sensor.washing_machine_water_consumption", "0", step)
@@ -406,7 +421,7 @@ async def test_laundry_wash_scenario(
},
}
freezer.tick(timedelta(seconds=130))
freezer.move_to("2025-05-31T12:34:00+00:00")
async_fire_time_changed(hass)
await hass.async_block_till_done()
@@ -426,7 +441,7 @@ async def test_laundry_wash_scenario(
device_fixture["DummyWasher"]["state"]["elapsedTime"][0] = 1
device_fixture["DummyWasher"]["state"]["elapsedTime"][1] = 49
freezer.tick(timedelta(seconds=130))
freezer.move_to("2025-05-31T14:07:00+00:00")
async_fire_time_changed(hass)
await hass.async_block_till_done()
step += 1
@@ -439,6 +454,12 @@ async def test_laundry_wash_scenario(
# RINSE HOLD -> elapsed, remaining time from API (normal case)
check_sensor_state(hass, "sensor.washing_machine_remaining_time", "8", step)
check_sensor_state(hass, "sensor.washing_machine_elapsed_time", "109", step)
check_sensor_state(
hass, "sensor.washing_machine_start", "2025-05-31T12:18:00+00:00", step
)
check_sensor_state(
hass, "sensor.washing_machine_finish", "2025-05-31T14:15:00+00:00", step
)
# Simulate program ended
device_fixture["DummyWasher"]["state"]["status"]["value_raw"] = 7
@@ -453,7 +474,7 @@ async def test_laundry_wash_scenario(
device_fixture["DummyWasher"]["state"]["elapsedTime"][1] = 0
device_fixture["DummyWasher"]["state"]["ecoFeedback"] = None
freezer.tick(timedelta(seconds=130))
freezer.move_to("2025-05-31T14:30:00+00:00")
async_fire_time_changed(hass)
await hass.async_block_till_done()
step += 1
@@ -469,6 +490,12 @@ async def test_laundry_wash_scenario(
check_sensor_state(hass, "sensor.washing_machine_remaining_time", "0", step)
# PROGRAM_ENDED -> elapsed time kept from last program (some devices immediately go to 0)
check_sensor_state(hass, "sensor.washing_machine_elapsed_time", "109", step)
check_sensor_state(
hass, "sensor.washing_machine_start", "2025-05-31T12:18:00+00:00", step
)
check_sensor_state(
hass, "sensor.washing_machine_finish", "2025-05-31T14:15:00+00:00", step
)
# consumption values now are reporting last known value, API might start reporting null object
check_sensor_state(hass, "sensor.washing_machine_energy_consumption", "0.1", step)
check_sensor_state(hass, "sensor.washing_machine_water_consumption", "7", step)
@@ -489,7 +516,7 @@ async def test_laundry_wash_scenario(
device_fixture["DummyWasher"]["state"]["elapsedTime"][0] = 0
device_fixture["DummyWasher"]["state"]["elapsedTime"][1] = 0
freezer.tick(timedelta(seconds=130))
freezer.move_to("2025-05-31T14:32:00+00:00")
async_fire_time_changed(hass)
await hass.async_block_till_done()
step += 1
@@ -504,6 +531,10 @@ async def test_laundry_wash_scenario(
# PROGRAMMED -> elapsed, remaining time from API (normal case)
check_sensor_state(hass, "sensor.washing_machine_remaining_time", "119", step)
check_sensor_state(hass, "sensor.washing_machine_elapsed_time", "0", step)
check_sensor_state(hass, "sensor.washing_machine_start", "unknown", step)
check_sensor_state(
hass, "sensor.washing_machine_finish", "2025-05-31T16:31:00+00:00", step
)
@pytest.mark.parametrize("load_device_file", ["laundry.json"])
@@ -519,6 +550,7 @@ async def test_laundry_dry_scenario(
"""Parametrized test for verifying time sensors for tumble dryer devices when API reports time value from last cycle, when device is off."""
step = 0
freezer.move_to("2025-05-31T12:00:00+00:00")
# Initial state when the washing machine is off
check_sensor_state(hass, "sensor.tumble_dryer", "off", step)
@@ -528,6 +560,8 @@ async def test_laundry_dry_scenario(
# OFF -> elapsed, remaining forced to unknown (some devices continue reporting last value of last cycle)
check_sensor_state(hass, "sensor.tumble_dryer_remaining_time", "unknown", step)
check_sensor_state(hass, "sensor.tumble_dryer_elapsed_time", "unknown", step)
check_sensor_state(hass, "sensor.tumble_dryer_start", "unknown", step)
check_sensor_state(hass, "sensor.tumble_dryer_finish", "unknown", step)
# Simulate program started
device_fixture["DummyDryer"]["state"]["status"]["value_raw"] = 5
@@ -545,7 +579,7 @@ async def test_laundry_dry_scenario(
device_fixture["DummyDryer"]["state"]["dryingStep"]["value_raw"] = 2
device_fixture["DummyDryer"]["state"]["dryingStep"]["value_localized"] = "Normal"
freezer.tick(timedelta(seconds=130))
freezer.move_to("2025-05-31T12:30:00+00:00")
async_fire_time_changed(hass)
await hass.async_block_till_done()
step += 1
@@ -557,6 +591,12 @@ async def test_laundry_dry_scenario(
# IN_USE -> elapsed, remaining time from API (normal case)
check_sensor_state(hass, "sensor.tumble_dryer_remaining_time", "49", step)
check_sensor_state(hass, "sensor.tumble_dryer_elapsed_time", "20", step)
check_sensor_state(
hass, "sensor.tumble_dryer_start", "2025-05-31T12:10:00+00:00", step
)
check_sensor_state(
hass, "sensor.tumble_dryer_finish", "2025-05-31T13:19:00+00:00", step
)
# Simulate program end
device_fixture["DummyDryer"]["state"]["status"]["value_raw"] = 7
@@ -570,7 +610,7 @@ async def test_laundry_dry_scenario(
device_fixture["DummyDryer"]["state"]["elapsedTime"][0] = 1
device_fixture["DummyDryer"]["state"]["elapsedTime"][1] = 18
freezer.tick(timedelta(seconds=130))
freezer.move_to("2025-05-31T14:30:00+00:00")
async_fire_time_changed(hass)
await hass.async_block_till_done()
step += 1
@@ -583,9 +623,18 @@ async def test_laundry_dry_scenario(
check_sensor_state(hass, "sensor.tumble_dryer_remaining_time", "0", step)
# PROGRAM_ENDED -> elapsed time kept from last program (some devices immediately go to 0)
check_sensor_state(hass, "sensor.tumble_dryer_elapsed_time", "20", step)
check_sensor_state(
hass, "sensor.tumble_dryer_start", "2025-05-31T12:10:00+00:00", step
)
check_sensor_state(
hass, "sensor.tumble_dryer_finish", "2025-05-31T13:19:00+00:00", step
)
@pytest.mark.parametrize("restore_state", ["45", STATE_UNKNOWN, STATE_UNAVAILABLE])
@pytest.mark.parametrize(
"restore_state_abs", ["2025-05-31T13:19:00+00:00", STATE_UNKNOWN, STATE_UNAVAILABLE]
)
@pytest.mark.parametrize("load_device_file", ["laundry.json"])
@pytest.mark.parametrize("platforms", [(SENSOR_DOMAIN,)])
async def test_elapsed_time_sensor_restored(
@@ -596,10 +645,12 @@ async def test_elapsed_time_sensor_restored(
device_fixture: MieleDevices,
freezer: FrozenDateTimeFactory,
restore_state,
restore_state_abs,
) -> None:
"""Test that elapsed time returns the restored value when program ended."""
entity_id = "sensor.washing_machine_elapsed_time"
entity_id_abs = "sensor.washing_machine_finish"
# Simulate program started
device_fixture["DummyWasher"]["state"]["status"]["value_raw"] = 5
@@ -623,11 +674,12 @@ async def test_elapsed_time_sensor_restored(
device_fixture["DummyWasher"]["state"]["spinningSpeed"]["value_raw"] = 1200
device_fixture["DummyWasher"]["state"]["spinningSpeed"]["value_localized"] = "1200"
freezer.tick(timedelta(seconds=130))
freezer.move_to(datetime(2025, 5, 31, 12, 30, tzinfo=UTC))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == "12"
assert hass.states.get(entity_id_abs).state == "2025-05-31T14:15:00+00:00"
# Simulate program ended
device_fixture["DummyWasher"]["state"]["status"]["value_raw"] = 7
@@ -641,7 +693,7 @@ async def test_elapsed_time_sensor_restored(
device_fixture["DummyWasher"]["state"]["elapsedTime"][0] = 0
device_fixture["DummyWasher"]["state"]["elapsedTime"][1] = 0
freezer.tick(timedelta(seconds=130))
freezer.move_to(datetime(2025, 5, 31, 14, 20, tzinfo=UTC))
async_fire_time_changed(hass)
await hass.async_block_till_done()
@@ -651,6 +703,7 @@ async def test_elapsed_time_sensor_restored(
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == "unavailable"
assert hass.states.get(entity_id_abs).state == "unavailable"
# simulate restore with state different from native value
mock_restore_cache_with_extra_data(
@@ -669,9 +722,19 @@ async def test_elapsed_time_sensor_restored(
"native_unit_of_measurement": "min",
},
),
(
State(
entity_id_abs,
restore_state_abs,
{"device_class": "timestamp"},
),
{
"native_value": datetime(2025, 5, 31, 14, 15, tzinfo=UTC),
"native_unit_of_measurement": None,
},
),
],
)
await hass.config_entries.async_reload(mock_config_entry.entry_id)
await hass.async_block_till_done()
@@ -679,3 +742,8 @@ async def test_elapsed_time_sensor_restored(
state = hass.states.get(entity_id)
assert state is not None
assert state.state == "12"
# check that absolute time is the one restored and not the value reported by API
state = hass.states.get(entity_id_abs)
assert state is not None
assert state.state == "2025-05-31T14:15:00+00:00"

View File

@@ -44,6 +44,17 @@ ENTRY_CONFIG = {
},
}
ENTRY_CONFIG_BLANK_QUERY = {
CONF_NAME: "Get Value",
CONF_QUERY: " ",
CONF_COLUMN_NAME: "value",
CONF_ADVANCED_OPTIONS: {
CONF_UNIT_OF_MEASUREMENT: "MiB",
CONF_DEVICE_CLASS: SensorDeviceClass.DATA_SIZE,
CONF_STATE_CLASS: SensorStateClass.TOTAL,
},
}
ENTRY_CONFIG_WITH_VALUE_TEMPLATE = {
CONF_QUERY: "SELECT 5 as value",
CONF_COLUMN_NAME: "value",
@@ -53,6 +64,33 @@ ENTRY_CONFIG_WITH_VALUE_TEMPLATE = {
},
}
ENTRY_CONFIG_WITH_QUERY_TEMPLATE = {
CONF_QUERY: "SELECT {% if states('sensor.input1')=='on' %} 5 {% else %} 6 {% endif %} as value",
CONF_COLUMN_NAME: "value",
CONF_ADVANCED_OPTIONS: {
CONF_UNIT_OF_MEASUREMENT: "MiB",
CONF_VALUE_TEMPLATE: "{{ value }}",
},
}
ENTRY_CONFIG_WITH_BROKEN_QUERY_TEMPLATE = {
CONF_QUERY: "SELECT {{ 5 as value",
CONF_COLUMN_NAME: "value",
CONF_ADVANCED_OPTIONS: {
CONF_UNIT_OF_MEASUREMENT: "MiB",
CONF_VALUE_TEMPLATE: "{{ value }}",
},
}
ENTRY_CONFIG_WITH_BROKEN_QUERY_TEMPLATE_OPT = {
CONF_QUERY: "SELECT {{ 5 as value",
CONF_COLUMN_NAME: "value",
CONF_ADVANCED_OPTIONS: {
CONF_UNIT_OF_MEASUREMENT: "MiB",
CONF_VALUE_TEMPLATE: "{{ value }}",
},
}
ENTRY_CONFIG_INVALID_QUERY = {
CONF_QUERY: "SELECT 5 FROM as value",
CONF_COLUMN_NAME: "size",

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from pathlib import Path
import re
from typing import Any
from unittest.mock import patch
@@ -10,7 +11,7 @@ import pytest
from sqlalchemy.exc import SQLAlchemyError
from homeassistant import config_entries
from homeassistant.components.recorder import CONF_DB_URL
from homeassistant.components.recorder import CONF_DB_URL, Recorder
from homeassistant.components.sensor import (
CONF_STATE_CLASS,
SensorDeviceClass,
@@ -29,7 +30,7 @@ from homeassistant.const import (
CONF_VALUE_TEMPLATE,
)
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.data_entry_flow import FlowResultType, InvalidData
from . import (
ENTRY_CONFIG,
@@ -48,6 +49,9 @@ from . import (
ENTRY_CONFIG_QUERY_NO_READ_ONLY_CTE,
ENTRY_CONFIG_QUERY_NO_READ_ONLY_CTE_OPT,
ENTRY_CONFIG_QUERY_NO_READ_ONLY_OPT,
ENTRY_CONFIG_WITH_BROKEN_QUERY_TEMPLATE,
ENTRY_CONFIG_WITH_BROKEN_QUERY_TEMPLATE_OPT,
ENTRY_CONFIG_WITH_QUERY_TEMPLATE,
ENTRY_CONFIG_WITH_VALUE_TEMPLATE,
)
@@ -106,7 +110,91 @@ async def test_form_simple(
}
async def test_form_with_value_template(hass: HomeAssistant) -> None:
async def test_form_with_query_template(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""Test for with query template."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {}
with patch(
"homeassistant.components.sql.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
DATA_CONFIG,
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
ENTRY_CONFIG_WITH_QUERY_TEMPLATE,
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "Get Value"
assert result["options"] == {
CONF_QUERY: "SELECT {% if states('sensor.input1')=='on' %} 5 {% else %} 6 {% endif %} as value",
CONF_COLUMN_NAME: "value",
CONF_ADVANCED_OPTIONS: {
CONF_UNIT_OF_MEASUREMENT: "MiB",
CONF_VALUE_TEMPLATE: "{{ value }}",
},
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_form_with_broken_query_template(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""Test form with broken query template."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
DATA_CONFIG,
)
message = re.escape("Schema validation failed @ data['query']")
with pytest.raises(InvalidData, match=message):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
ENTRY_CONFIG_WITH_BROKEN_QUERY_TEMPLATE,
)
with patch(
"homeassistant.components.sql.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
ENTRY_CONFIG_WITH_QUERY_TEMPLATE,
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "Get Value"
assert result["options"] == {
CONF_QUERY: "SELECT {% if states('sensor.input1')=='on' %} 5 {% else %} 6 {% endif %} as value",
CONF_COLUMN_NAME: "value",
CONF_ADVANCED_OPTIONS: {
CONF_UNIT_OF_MEASUREMENT: "MiB",
CONF_VALUE_TEMPLATE: "{{ value }}",
},
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_form_with_value_template(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""Test for with value template."""
result = await hass.config_entries.flow.async_init(
@@ -192,7 +280,7 @@ async def test_flow_fails_invalid_query(hass: HomeAssistant) -> None:
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {
CONF_QUERY: "query_invalid",
CONF_QUERY: "query_no_read_only",
}
result = await hass.config_entries.flow.async_configure(
@@ -202,7 +290,7 @@ async def test_flow_fails_invalid_query(hass: HomeAssistant) -> None:
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {
CONF_QUERY: "query_invalid",
CONF_QUERY: "query_no_read_only",
}
result = await hass.config_entries.flow.async_configure(
@@ -484,7 +572,7 @@ async def test_options_flow_fails_invalid_query(hass: HomeAssistant) -> None:
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {
CONF_QUERY: "query_invalid",
CONF_QUERY: "query_no_read_only",
}
result = await hass.config_entries.options.async_configure(
@@ -494,9 +582,8 @@ async def test_options_flow_fails_invalid_query(hass: HomeAssistant) -> None:
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {
CONF_QUERY: "query_invalid",
CONF_QUERY: "query_no_read_only",
}
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input=ENTRY_CONFIG_QUERY_NO_READ_ONLY_OPT,
@@ -527,6 +614,13 @@ async def test_options_flow_fails_invalid_query(hass: HomeAssistant) -> None:
CONF_QUERY: "multiple_queries",
}
message = re.escape("Schema validation failed @ data['query']")
with pytest.raises(InvalidData, match=message):
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input=ENTRY_CONFIG_WITH_BROKEN_QUERY_TEMPLATE_OPT,
)
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={

View File

@@ -4,6 +4,9 @@ from __future__ import annotations
from unittest.mock import patch
import pytest
import voluptuous as vol
from homeassistant.components.recorder import CONF_DB_URL, Recorder
from homeassistant.components.sensor import (
CONF_STATE_CLASS,
@@ -16,6 +19,7 @@ from homeassistant.components.sql.const import (
CONF_QUERY,
DOMAIN,
)
from homeassistant.components.sql.util import validate_sql_select
from homeassistant.config_entries import SOURCE_USER, ConfigEntryState
from homeassistant.const import (
CONF_DEVICE_CLASS,
@@ -24,6 +28,7 @@ from homeassistant.const import (
CONF_VALUE_TEMPLATE,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.template import Template
from homeassistant.setup import async_setup_component
from . import YAML_CONFIG_INVALID, YAML_CONFIG_NO_DB, init_integration
@@ -67,6 +72,45 @@ async def test_setup_invalid_config(
await hass.async_block_till_done()
async def test_invalid_query(hass: HomeAssistant) -> None:
"""Test invalid query."""
with pytest.raises(vol.Invalid, match="SQL query must be of type SELECT"):
validate_sql_select(Template("DROP TABLE *", hass))
with pytest.raises(vol.Invalid, match="SQL query is empty or unknown type"):
validate_sql_select(Template("SELECT5 as value", hass))
with pytest.raises(vol.Invalid, match="SQL query is empty or unknown type"):
validate_sql_select(Template(";;", hass))
async def test_query_no_read_only(hass: HomeAssistant) -> None:
"""Test query no read only."""
with pytest.raises(vol.Invalid, match="SQL query must be of type SELECT"):
validate_sql_select(
Template("UPDATE states SET state = 999999 WHERE state_id = 11125", hass)
)
async def test_query_no_read_only_cte(hass: HomeAssistant) -> None:
"""Test query no read only CTE."""
with pytest.raises(vol.Invalid, match="SQL query must be of type SELECT"):
validate_sql_select(
Template(
"WITH test AS (SELECT state FROM states) UPDATE states SET states.state = test.state;",
hass,
)
)
async def test_multiple_queries(hass: HomeAssistant) -> None:
"""Test multiple queries."""
with pytest.raises(vol.Invalid, match="Multiple SQL statements are not allowed"):
validate_sql_select(
Template("SELECT 5 as value; UPDATE states SET state = 10;", hass)
)
async def test_migration_from_future(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:

View File

@@ -39,7 +39,6 @@ from homeassistant.const import (
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import issue_registry as ir
from homeassistant.helpers.entity_platform import async_get_platforms
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
@@ -109,6 +108,33 @@ async def test_query_value_template(
}
async def test_template_query(
recorder_mock: Recorder,
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test the SQL sensor with a query template."""
options = {
CONF_QUERY: "SELECT {% if states('sensor.input1')=='on' %} 5 {% else %} 6 {% endif %} as value",
CONF_COLUMN_NAME: "value",
CONF_ADVANCED_OPTIONS: {
CONF_VALUE_TEMPLATE: "{{ value | int }}",
},
}
await init_integration(hass, title="count_tables", options=options)
state = hass.states.get("sensor.count_tables")
assert state.state == "6"
hass.states.async_set("sensor.input1", "on")
freezer.tick(timedelta(minutes=1))
async_fire_time_changed(hass)
await hass.async_block_till_done(wait_background_tasks=True)
state = hass.states.get("sensor.count_tables")
assert state.state == "5"
async def test_query_value_template_invalid(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
@@ -124,6 +150,59 @@ async def test_query_value_template_invalid(
assert state.state == "5.01"
async def test_broken_template_query(
recorder_mock: Recorder,
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test the SQL sensor with a query template which is broken."""
options = {
CONF_QUERY: "SELECT {{ 5 as value",
CONF_COLUMN_NAME: "value",
CONF_ADVANCED_OPTIONS: {
CONF_VALUE_TEMPLATE: "{{ value | int }}",
},
}
await init_integration(hass, title="count_tables", options=options)
state = hass.states.get("sensor.count_tables")
assert not state
async def test_broken_template_query_2(
recorder_mock: Recorder,
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test the SQL sensor with a query template."""
hass.states.async_set("sensor.input1", "5")
await hass.async_block_till_done(wait_background_tasks=True)
options = {
CONF_QUERY: "SELECT {{ states.sensor.input1.state | int / 1000}} as value",
CONF_COLUMN_NAME: "value",
}
await init_integration(hass, title="count_tables", options=options)
state = hass.states.get("sensor.count_tables")
assert state.state == "0.005"
hass.states.async_set("sensor.input1", "on")
freezer.tick(timedelta(minutes=1))
async_fire_time_changed(hass)
await hass.async_block_till_done(wait_background_tasks=True)
state = hass.states.get("sensor.count_tables")
assert state.state == "0.005"
assert (
"Error rendering query SELECT {{ states.sensor.input1.state | int / 1000}} as value"
" LIMIT 1;: ValueError: Template error: int got invalid input 'on' when rendering"
" template 'SELECT {{ states.sensor.input1.state | int / 1000}} as value LIMIT 1;'"
" but no default was specified" in caplog.text
)
async def test_query_limit(recorder_mock: Recorder, hass: HomeAssistant) -> None:
"""Test the SQL sensor with a query containing 'LIMIT' in lowercase."""
options = {
@@ -641,17 +720,14 @@ async def test_query_recover_from_rollback(
CONF_UNIQUE_ID: "very_unique_id",
}
await init_integration(hass, title="Select value SQL query", options=options)
platforms = async_get_platforms(hass, "sql")
sql_entity = platforms[0].entities["sensor.select_value_sql_query"]
state = hass.states.get("sensor.select_value_sql_query")
assert state.state == "5"
assert state.attributes["value"] == 5
with patch.object(
sql_entity,
"_lambda_stmt",
generate_lambda_stmt("Faulty syntax create operational issue"),
with patch(
"homeassistant.components.sql.sensor.generate_lambda_stmt",
return_value=generate_lambda_stmt("Faulty syntax create operational issue"),
):
freezer.tick(timedelta(minutes=1))
async_fire_time_changed(hass)

View File

@@ -153,7 +153,7 @@ async def test_query_service_invalid_query_not_select(
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
with pytest.raises(vol.Invalid, match="Only SELECT queries allowed"):
with pytest.raises(vol.Invalid, match="SQL query must be of type SELECT"):
await hass.services.async_call(
DOMAIN,
SERVICE_QUERY,
@@ -171,7 +171,7 @@ async def test_query_service_sqlalchemy_error(
await async_setup_component(hass, DOMAIN, {})
await hass.async_block_till_done()
with pytest.raises(MultipleInvalid, match="Invalid SQL query"):
with pytest.raises(MultipleInvalid, match="SQL query is empty or unknown type"):
await hass.services.async_call(
DOMAIN,
SERVICE_QUERY,

View File

@@ -13,6 +13,7 @@ from homeassistant.components.sql.util import (
validate_sql_select,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.template import Template
async def test_resolve_db_url_when_none_configured(
@@ -39,27 +40,27 @@ async def test_resolve_db_url_when_configured(hass: HomeAssistant) -> None:
[
(
"DROP TABLE *",
"Only SELECT queries allowed",
"SQL query must be of type SELECT",
),
(
"SELECT5 as value",
"Invalid SQL query",
"SQL query is empty or unknown type",
),
(
";;",
"Invalid SQL query",
"SQL query is empty or unknown type",
),
(
"UPDATE states SET state = 999999 WHERE state_id = 11125",
"Only SELECT queries allowed",
"SQL query must be of type SELECT",
),
(
"WITH test AS (SELECT state FROM states) UPDATE states SET states.state = test.state;",
"Only SELECT queries allowed",
"SQL query must be of type SELECT",
),
(
"SELECT 5 as value; UPDATE states SET state = 10;",
"Multiple SQL queries are not supported",
"Multiple SQL statements are not allowed",
),
],
)
@@ -70,7 +71,7 @@ async def test_invalid_sql_queries(
) -> None:
"""Test that various invalid or disallowed SQL queries raise the correct exception."""
with pytest.raises(vol.Invalid, match=expected_error_message):
validate_sql_select(sql_query)
validate_sql_select(Template(sql_query, hass))
@pytest.mark.parametrize(

View File

@@ -450,10 +450,10 @@ async def test_caching(hass: HomeAssistant) -> None:
side_effect=translation.build_resources,
) as mock_build_resources:
load1 = await translation.async_get_translations(hass, "en", "entity_component")
assert len(mock_build_resources.mock_calls) == 7
assert len(mock_build_resources.mock_calls) == 8
load2 = await translation.async_get_translations(hass, "en", "entity_component")
assert len(mock_build_resources.mock_calls) == 7
assert len(mock_build_resources.mock_calls) == 8
assert load1 == load2

View File

@@ -190,6 +190,7 @@ def test_run_json_flag_only() -> None:
with (
patch("builtins.print") as mock_print,
patch.object(check_config, "check") as mock_check,
patch("sys.argv", ["", "--json"]),
):
mock_check.return_value = {
"except": {"domain1": ["error1", "error2"]},
@@ -200,7 +201,7 @@ def test_run_json_flag_only() -> None:
"yaml_files": {},
}
exit_code = check_config.run(["--json"])
exit_code = check_config.run(None)
# Should exit with code 1 (1 domain with errors)
assert exit_code == 1
@@ -233,7 +234,10 @@ def test_run_json_flag_only() -> None:
def test_run_fail_on_warnings_flag_only() -> None:
"""Test that --fail-on-warnings flag works independently."""
# Test with warnings only
with patch.object(check_config, "check") as mock_check:
with (
patch.object(check_config, "check") as mock_check,
patch("sys.argv", ["", "--fail-on-warnings"]),
):
mock_check.return_value = {
"except": {},
"warn": {"light": ["warning message"]},
@@ -243,7 +247,7 @@ def test_run_fail_on_warnings_flag_only() -> None:
"yaml_files": {},
}
exit_code = check_config.run(["--fail-on-warnings"])
exit_code = check_config.run(None)
assert exit_code == 1 # Should exit non-zero due to warnings
# Test with no warnings or errors
@@ -282,6 +286,7 @@ def test_run_json_output_structure() -> None:
with (
patch("builtins.print") as mock_print,
patch.object(check_config, "check") as mock_check,
patch("sys.argv", ["", "--json", "--config", "/test/path"]),
):
mock_check.return_value = {
"except": {"domain1": ["error1", {"config": "bad"}]},
@@ -292,7 +297,7 @@ def test_run_json_output_structure() -> None:
"yaml_files": {},
}
exit_code = check_config.run(["--json", "--config", "/test/path"])
exit_code = check_config.run(None)
json_output = mock_print.call_args[0][0]
parsed_json = json.loads(json_output)
@@ -413,7 +418,11 @@ def test_run_exit_code_logic() -> None:
]
for errors, warnings, flags, expected_exit in test_cases:
with patch("builtins.print"), patch.object(check_config, "check") as mock_check:
with (
patch("builtins.print"),
patch.object(check_config, "check") as mock_check,
patch("sys.argv", ["", *flags]),
):
mock_check.return_value = {
"except": errors,
"warn": warnings,
@@ -423,7 +432,7 @@ def test_run_exit_code_logic() -> None:
"yaml_files": {},
}
exit_code = check_config.run(flags)
exit_code = check_config.run(None)
assert exit_code == expected_exit, (
f"Failed for errors={errors}, warnings={warnings}, flags={flags}. "
f"Expected {expected_exit}, got {exit_code}"
@@ -447,7 +456,7 @@ def test_run_human_readable_still_works() -> None:
"yaml_files": {},
}
check_config.run([])
check_config.run(None)
# Should print the "Testing configuration at" message
printed_outputs = [
@@ -463,9 +472,11 @@ def test_run_human_readable_still_works() -> None:
def test_run_with_config_path() -> None:
"""Test that config path is correctly included in JSON output."""
test_config_path = "/custom/config/path"
with (
patch("builtins.print") as mock_print,
patch.object(check_config, "check") as mock_check,
patch("sys.argv", ["", "--json", "--config", test_config_path]),
):
mock_check.return_value = {
"except": {},
@@ -476,8 +487,7 @@ def test_run_with_config_path() -> None:
"yaml_files": {},
}
test_config_path = "/custom/config/path"
check_config.run(["--json", "--config", test_config_path])
check_config.run(None)
json_output = mock_print.call_args[0][0]
parsed_json = json.loads(json_output)
@@ -495,6 +505,7 @@ def test_unknown_arguments_with_json() -> None:
with (
patch("builtins.print") as mock_print,
patch.object(check_config, "check") as mock_check,
patch("sys.argv", ["", "--json", "--unknown-flag", "value"]),
):
mock_check.return_value = {
"except": {},
@@ -505,7 +516,7 @@ def test_unknown_arguments_with_json() -> None:
"yaml_files": {},
}
check_config.run(["--json", "--unknown-flag", "value"])
check_config.run(None)
# Should still print unknown argument warning AND JSON
assert mock_print.call_count == 2
@@ -528,6 +539,7 @@ def test_info_flag_with_json() -> None:
with (
patch("builtins.print") as mock_print,
patch.object(check_config, "check") as mock_check,
patch("sys.argv", ["", "--json", "--info", "light"]),
):
mock_check.return_value = {
"except": {},
@@ -539,7 +551,7 @@ def test_info_flag_with_json() -> None:
}
# Test --json with --info - JSON should take precedence
exit_code = check_config.run(["--json", "--info", "light"])
exit_code = check_config.run(None)
assert exit_code == 0
assert mock_print.call_count == 1
@@ -564,6 +576,7 @@ def test_config_flag_variations() -> None:
with (
patch("builtins.print") as mock_print,
patch.object(check_config, "check") as mock_check,
patch("sys.argv", ["", *flags]),
):
mock_check.return_value = {
"except": {},
@@ -574,7 +587,7 @@ def test_config_flag_variations() -> None:
"yaml_files": {},
}
check_config.run(flags)
check_config.run(None)
if "--json" in flags:
json_output = json.loads(mock_print.call_args[0][0])
@@ -587,6 +600,10 @@ def test_multiple_config_flags() -> None:
with (
patch("builtins.print") as mock_print,
patch.object(check_config, "check") as mock_check,
patch(
"sys.argv",
["", "--json", "--config", "/first/path", "--config", "/second/path"],
),
):
mock_check.return_value = {
"except": {},
@@ -598,9 +615,7 @@ def test_multiple_config_flags() -> None:
}
# Last config flag should win
check_config.run(
["--json", "--config", "/first/path", "--config", "/second/path"]
)
check_config.run(None)
json_output = json.loads(mock_print.call_args[0][0])
expected_path = os.path.join(os.getcwd(), "/second/path")
@@ -622,6 +637,7 @@ def test_fail_on_warnings_with_json_combinations() -> None:
with (
patch("builtins.print") as mock_print,
patch.object(check_config, "check") as mock_check,
patch("sys.argv", ["", "--json", "--fail-on-warnings"]),
):
mock_check.return_value = {
"except": errors,
@@ -632,7 +648,7 @@ def test_fail_on_warnings_with_json_combinations() -> None:
"yaml_files": {},
}
exit_code = check_config.run(["--json", "--fail-on-warnings"])
exit_code = check_config.run(None)
assert exit_code == expected_exit
# Should still output valid JSON

View File

@@ -1,43 +1,10 @@
"""Test const module."""
from enum import Enum
import pytest
from homeassistant import const
from .common import help_test_all, import_and_test_deprecated_constant
def _create_tuples(
value: type[Enum] | list[Enum], constant_prefix: str
) -> list[tuple[Enum, str]]:
return [(enum, constant_prefix) for enum in value]
from .common import help_test_all
def test_all() -> None:
"""Test module.__all__ is correctly set."""
help_test_all(const)
@pytest.mark.parametrize(
("replacement", "constant_name", "breaks_in_version"),
[
(const.UnitOfArea.SQUARE_METERS, "AREA_SQUARE_METERS", "2025.12"),
],
)
def test_deprecated_constant_name_changes(
caplog: pytest.LogCaptureFixture,
replacement: Enum,
constant_name: str,
breaks_in_version: str,
) -> None:
"""Test deprecated constants, where the name is not the same as the enum value."""
import_and_test_deprecated_constant(
caplog,
const,
constant_name,
f"{replacement.__class__.__name__}.{replacement.name}",
replacement,
breaks_in_version,
)