Move sun conditions to the sun integration (#144742)

This commit is contained in:
Erik Montnemery 2025-05-13 00:03:37 +02:00 committed by GitHub
parent e69ca0cf80
commit 0128d85999
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 1405 additions and 1340 deletions

View File

@ -0,0 +1,136 @@
"""Offer sun based automation rules."""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import cast
import voluptuous as vol
from homeassistant.const import CONF_CONDITION, SUN_EVENT_SUNRISE, SUN_EVENT_SUNSET
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.condition import (
ConditionCheckerType,
condition_trace_set_result,
condition_trace_update_result,
trace_condition_function,
)
from homeassistant.helpers.sun import get_astral_event_date
from homeassistant.helpers.typing import ConfigType, TemplateVarsType
from homeassistant.util import dt as dt_util
CONDITION_SCHEMA = vol.All(
vol.Schema(
{
**cv.CONDITION_BASE_SCHEMA,
vol.Required(CONF_CONDITION): "sun",
vol.Optional("before"): cv.sun_event,
vol.Optional("before_offset"): cv.time_period,
vol.Optional("after"): vol.All(
vol.Lower, vol.Any(SUN_EVENT_SUNSET, SUN_EVENT_SUNRISE)
),
vol.Optional("after_offset"): cv.time_period,
}
),
cv.has_at_least_one_key("before", "after"),
)
def sun(
hass: HomeAssistant,
before: str | None = None,
after: str | None = None,
before_offset: timedelta | None = None,
after_offset: timedelta | None = None,
) -> bool:
"""Test if current time matches sun requirements."""
utcnow = dt_util.utcnow()
today = dt_util.as_local(utcnow).date()
before_offset = before_offset or timedelta(0)
after_offset = after_offset or timedelta(0)
sunrise = get_astral_event_date(hass, SUN_EVENT_SUNRISE, today)
sunset = get_astral_event_date(hass, SUN_EVENT_SUNSET, today)
has_sunrise_condition = SUN_EVENT_SUNRISE in (before, after)
has_sunset_condition = SUN_EVENT_SUNSET in (before, after)
after_sunrise = today > dt_util.as_local(cast(datetime, sunrise)).date()
if after_sunrise and has_sunrise_condition:
tomorrow = today + timedelta(days=1)
sunrise = get_astral_event_date(hass, SUN_EVENT_SUNRISE, tomorrow)
after_sunset = today > dt_util.as_local(cast(datetime, sunset)).date()
if after_sunset and has_sunset_condition:
tomorrow = today + timedelta(days=1)
sunset = get_astral_event_date(hass, SUN_EVENT_SUNSET, tomorrow)
# Special case: before sunrise OR after sunset
# This will handle the very rare case in the polar region when the sun rises/sets
# but does not set/rise.
# However this entire condition does not handle those full days of darkness
# or light, the following should be used instead:
#
# condition:
# condition: state
# entity_id: sun.sun
# state: 'above_horizon' (or 'below_horizon')
#
if before == SUN_EVENT_SUNRISE and after == SUN_EVENT_SUNSET:
wanted_time_before = cast(datetime, sunrise) + before_offset
condition_trace_update_result(wanted_time_before=wanted_time_before)
wanted_time_after = cast(datetime, sunset) + after_offset
condition_trace_update_result(wanted_time_after=wanted_time_after)
return utcnow < wanted_time_before or utcnow > wanted_time_after
if sunrise is None and has_sunrise_condition:
# There is no sunrise today
condition_trace_set_result(False, message="no sunrise today")
return False
if sunset is None and has_sunset_condition:
# There is no sunset today
condition_trace_set_result(False, message="no sunset today")
return False
if before == SUN_EVENT_SUNRISE:
wanted_time_before = cast(datetime, sunrise) + before_offset
condition_trace_update_result(wanted_time_before=wanted_time_before)
if utcnow > wanted_time_before:
return False
if before == SUN_EVENT_SUNSET:
wanted_time_before = cast(datetime, sunset) + before_offset
condition_trace_update_result(wanted_time_before=wanted_time_before)
if utcnow > wanted_time_before:
return False
if after == SUN_EVENT_SUNRISE:
wanted_time_after = cast(datetime, sunrise) + after_offset
condition_trace_update_result(wanted_time_after=wanted_time_after)
if utcnow < wanted_time_after:
return False
if after == SUN_EVENT_SUNSET:
wanted_time_after = cast(datetime, sunset) + after_offset
condition_trace_update_result(wanted_time_after=wanted_time_after)
if utcnow < wanted_time_after:
return False
return True
def async_condition_from_config(config: ConfigType) -> ConditionCheckerType:
"""Wrap action method with sun based condition."""
before = config.get("before")
after = config.get("after")
before_offset = config.get("before_offset")
after_offset = config.get("after_offset")
@trace_condition_function
def sun_if(hass: HomeAssistant, variables: TemplateVarsType = None) -> bool:
"""Validate time based if-condition."""
return sun(hass, before, after, before_offset, after_offset)
return sun_if

View File

@ -42,8 +42,6 @@ from homeassistant.const import (
ENTITY_MATCH_ANY,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
SUN_EVENT_SUNRISE,
SUN_EVENT_SUNSET,
WEEKDAYS,
)
from homeassistant.core import HomeAssistant, State, callback
@ -60,7 +58,6 @@ from homeassistant.util import dt as dt_util
from homeassistant.util.async_ import run_callback_threadsafe
from . import config_validation as cv, entity_registry as er
from .sun import get_astral_event_date
from .template import Template, render_complex
from .trace import (
TraceElement,
@ -85,7 +82,6 @@ _PLATFORM_ALIASES = {
"numeric_state": None,
"or": None,
"state": None,
"sun": None,
"template": None,
"time": None,
"trigger": None,
@ -655,105 +651,6 @@ def state_from_config(config: ConfigType) -> ConditionCheckerType:
return if_state
def sun(
hass: HomeAssistant,
before: str | None = None,
after: str | None = None,
before_offset: timedelta | None = None,
after_offset: timedelta | None = None,
) -> bool:
"""Test if current time matches sun requirements."""
utcnow = dt_util.utcnow()
today = dt_util.as_local(utcnow).date()
before_offset = before_offset or timedelta(0)
after_offset = after_offset or timedelta(0)
sunrise = get_astral_event_date(hass, SUN_EVENT_SUNRISE, today)
sunset = get_astral_event_date(hass, SUN_EVENT_SUNSET, today)
has_sunrise_condition = SUN_EVENT_SUNRISE in (before, after)
has_sunset_condition = SUN_EVENT_SUNSET in (before, after)
after_sunrise = today > dt_util.as_local(cast(datetime, sunrise)).date()
if after_sunrise and has_sunrise_condition:
tomorrow = today + timedelta(days=1)
sunrise = get_astral_event_date(hass, SUN_EVENT_SUNRISE, tomorrow)
after_sunset = today > dt_util.as_local(cast(datetime, sunset)).date()
if after_sunset and has_sunset_condition:
tomorrow = today + timedelta(days=1)
sunset = get_astral_event_date(hass, SUN_EVENT_SUNSET, tomorrow)
# Special case: before sunrise OR after sunset
# This will handle the very rare case in the polar region when the sun rises/sets
# but does not set/rise.
# However this entire condition does not handle those full days of darkness
# or light, the following should be used instead:
#
# condition:
# condition: state
# entity_id: sun.sun
# state: 'above_horizon' (or 'below_horizon')
#
if before == SUN_EVENT_SUNRISE and after == SUN_EVENT_SUNSET:
wanted_time_before = cast(datetime, sunrise) + before_offset
condition_trace_update_result(wanted_time_before=wanted_time_before)
wanted_time_after = cast(datetime, sunset) + after_offset
condition_trace_update_result(wanted_time_after=wanted_time_after)
return utcnow < wanted_time_before or utcnow > wanted_time_after
if sunrise is None and has_sunrise_condition:
# There is no sunrise today
condition_trace_set_result(False, message="no sunrise today")
return False
if sunset is None and has_sunset_condition:
# There is no sunset today
condition_trace_set_result(False, message="no sunset today")
return False
if before == SUN_EVENT_SUNRISE:
wanted_time_before = cast(datetime, sunrise) + before_offset
condition_trace_update_result(wanted_time_before=wanted_time_before)
if utcnow > wanted_time_before:
return False
if before == SUN_EVENT_SUNSET:
wanted_time_before = cast(datetime, sunset) + before_offset
condition_trace_update_result(wanted_time_before=wanted_time_before)
if utcnow > wanted_time_before:
return False
if after == SUN_EVENT_SUNRISE:
wanted_time_after = cast(datetime, sunrise) + after_offset
condition_trace_update_result(wanted_time_after=wanted_time_after)
if utcnow < wanted_time_after:
return False
if after == SUN_EVENT_SUNSET:
wanted_time_after = cast(datetime, sunset) + after_offset
condition_trace_update_result(wanted_time_after=wanted_time_after)
if utcnow < wanted_time_after:
return False
return True
def sun_from_config(config: ConfigType) -> ConditionCheckerType:
"""Wrap action method with sun based condition."""
before = config.get("before")
after = config.get("after")
before_offset = config.get("before_offset")
after_offset = config.get("after_offset")
@trace_condition_function
def sun_if(hass: HomeAssistant, variables: TemplateVarsType = None) -> bool:
"""Validate time based if-condition."""
return sun(hass, before, after, before_offset, after_offset)
return sun_if
def template(
hass: HomeAssistant, value_template: Template, variables: TemplateVarsType = None
) -> bool:
@ -1054,8 +951,10 @@ async def async_validate_condition_config(
return config
platform = await _async_get_condition_platform(hass, config)
if platform is not None and hasattr(platform, "async_validate_condition_config"):
return await platform.async_validate_condition_config(hass, config)
if platform is not None:
if hasattr(platform, "async_validate_condition_config"):
return await platform.async_validate_condition_config(hass, config)
return cast(ConfigType, platform.CONDITION_SCHEMA(config))
if platform is None and condition in ("numeric_state", "state"):
validator = cast(
Callable[[HomeAssistant, ConfigType], ConfigType],

View File

@ -1090,7 +1090,7 @@ type ValueSchemas = dict[Hashable, VolSchemaType | Callable[[Any], dict[str, Any
def key_value_schemas(
key: str,
value_schemas: ValueSchemas,
default_schema: VolSchemaType | None = None,
default_schema: VolSchemaType | Callable[[Any], dict[str, Any]] | None = None,
default_description: str | None = None,
) -> Callable[[Any], dict[Hashable, Any]]:
"""Create a validator that validates based on a value for specific key.
@ -1745,18 +1745,35 @@ BUILT_IN_CONDITIONS: ValueSchemas = {
"numeric_state": NUMERIC_STATE_CONDITION_SCHEMA,
"or": OR_CONDITION_SCHEMA,
"state": STATE_CONDITION_SCHEMA,
"sun": SUN_CONDITION_SCHEMA,
"template": TEMPLATE_CONDITION_SCHEMA,
"time": TIME_CONDITION_SCHEMA,
"trigger": TRIGGER_CONDITION_SCHEMA,
"zone": ZONE_CONDITION_SCHEMA,
}
# This is first round of validation, we don't want to mutate the config here already,
# just ensure basics as condition type and alias are there.
def _base_condition_validator(value: Any) -> Any:
vol.Schema(
{
**CONDITION_BASE_SCHEMA,
CONF_CONDITION: vol.NotIn(BUILT_IN_CONDITIONS),
},
extra=vol.ALLOW_EXTRA,
)(value)
return value
CONDITION_SCHEMA: vol.Schema = vol.Schema(
vol.Any(
vol.All(
expand_condition_shorthand,
key_value_schemas(CONF_CONDITION, BUILT_IN_CONDITIONS),
key_value_schemas(
CONF_CONDITION,
BUILT_IN_CONDITIONS,
_base_condition_validator,
),
),
dynamic_template_condition,
)
@ -1783,7 +1800,10 @@ CONDITION_ACTION_SCHEMA: vol.Schema = vol.Schema(
key_value_schemas(
CONF_CONDITION,
BUILT_IN_CONDITIONS,
dynamic_template_condition_action,
vol.Any(
dynamic_template_condition_action,
_base_condition_validator,
),
"a list of conditions or a valid template",
),
)
@ -1842,7 +1862,7 @@ def _base_trigger_list_flatten(triggers: list[Any]) -> list[Any]:
return flatlist
# This is first round of validation, we don't want to process the config here already,
# This is first round of validation, we don't want to mutate the config here already,
# just ensure basics as platform and ID are there.
def _base_trigger_validator(value: Any) -> Any:
_base_trigger_validator_schema(value)

File diff suppressed because it is too large Load Diff

View File

@ -2529,9 +2529,8 @@ async def test_validate_config_works(
"state": "paulus",
},
(
"Unexpected value for condition: 'non_existing'. Expected and, device,"
" not, numeric_state, or, state, sun, template, time, trigger, zone "
"@ data[0]"
"Invalid condition \"non_existing\" specified {'condition': "
"'non_existing', 'entity_id': 'hello.world', 'state': 'paulus'}"
),
),
# Raises HomeAssistantError

File diff suppressed because it is too large Load Diff

View File

@ -1460,11 +1460,6 @@ def test_key_value_schemas_with_default() -> None:
[
({"delay": "{{ invalid"}, "should be format 'HH:MM'"),
({"wait_template": "{{ invalid"}, "invalid template"),
({"condition": "invalid"}, "Unexpected value for condition: 'invalid'"),
(
{"condition": "not", "conditions": {"condition": "invalid"}},
"Unexpected value for condition: 'invalid'",
),
# The validation error message could be improved to explain that this is not
# a valid shorthand template
(
@ -1496,7 +1491,7 @@ def test_key_value_schemas_with_default() -> None:
)
@pytest.mark.usefixtures("hass")
def test_script(caplog: pytest.LogCaptureFixture, config: dict, error: str) -> None:
"""Test script validation is user friendly."""
"""Test script action validation is user friendly."""
with pytest.raises(vol.Invalid, match=error):
cv.script_action(config)