Allow time triggers with offsets to use input_datetimes (#131550)

This commit is contained in:
Petro31 2025-01-22 16:57:13 -05:00 committed by GitHub
parent cad49453eb
commit 4a7e009f27
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 103 additions and 13 deletions

View File

@ -42,7 +42,7 @@ _TIME_AT_SCHEMA = vol.Any(cv.time, _TIME_TRIGGER_ENTITY)
_TIME_TRIGGER_ENTITY_WITH_OFFSET = vol.Schema(
{
vol.Required(CONF_ENTITY_ID): cv.entity_domain(["sensor"]),
vol.Required(CONF_ENTITY_ID): cv.entity_domain(["input_datetime", "sensor"]),
vol.Optional(CONF_OFFSET): cv.time_period,
}
)
@ -156,14 +156,17 @@ async def async_attach_trigger(
if has_date:
# If input_datetime has date, then track point in time.
trigger_dt = datetime(
year,
month,
day,
hour,
minute,
second,
tzinfo=dt_util.get_default_time_zone(),
trigger_dt = (
datetime(
year,
month,
day,
hour,
minute,
second,
tzinfo=dt_util.get_default_time_zone(),
)
+ offset
)
# Only set up listener if time is now or in the future.
if trigger_dt >= dt_util.now():
@ -178,6 +181,17 @@ async def async_attach_trigger(
)
elif has_time:
# Else if it has time, then track time change.
if offset != timedelta(0):
# Create a temporary datetime object to get an offset.
temp_dt = dt_util.now().replace(
hour=hour, minute=minute, second=second, microsecond=0
)
temp_dt += offset
# Ignore the date and apply the offset even if it wraps
# around to the next day.
hour = temp_dt.hour
minute = temp_dt.minute
second = temp_dt.second
remove = async_track_time_change(
hass,
partial(

View File

@ -156,6 +156,86 @@ async def test_if_fires_using_at_input_datetime(
)
@pytest.mark.parametrize(("hour"), [0, 5, 23])
@pytest.mark.parametrize(
("has_date", "has_time"), [(True, True), (False, True), (True, False)]
)
@pytest.mark.parametrize(
("offset", "delta"),
[
("00:00:10", timedelta(seconds=10)),
("-00:00:10", timedelta(seconds=-10)),
({"minutes": 5}, timedelta(minutes=5)),
("01:00:10", timedelta(hours=1, seconds=10)),
],
)
async def test_if_fires_using_at_input_datetime_with_offset(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
service_calls: list[ServiceCall],
has_date: bool,
has_time: bool,
offset: str,
delta: timedelta,
hour: int,
) -> None:
"""Test for firing at input_datetime."""
await async_setup_component(
hass,
"input_datetime",
{"input_datetime": {"trigger": {"has_date": has_date, "has_time": has_time}}},
)
now = dt_util.now()
start_dt = now.replace(
hour=hour if has_time else 0, minute=0, second=0, microsecond=0
) + timedelta(2)
trigger_dt = start_dt + delta
await hass.services.async_call(
"input_datetime",
"set_datetime",
{
ATTR_ENTITY_ID: "input_datetime.trigger",
"datetime": str(start_dt.replace(tzinfo=None)),
},
blocking=True,
)
await hass.async_block_till_done()
time_that_will_not_match_right_away = trigger_dt - timedelta(minutes=1)
some_data = "{{ trigger.platform }}-{{ trigger.now.day }}-{{ trigger.now.hour }}-{{trigger.entity_id}}"
freezer.move_to(dt_util.as_utc(time_that_will_not_match_right_away))
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
"platform": "time",
"at": {"entity_id": "input_datetime.trigger", "offset": offset},
},
"action": {
"service": "test.automation",
"data_template": {"some": some_data},
},
}
},
)
await hass.async_block_till_done()
async_fire_time_changed(hass, trigger_dt + timedelta(seconds=1))
await hass.async_block_till_done()
assert len(service_calls) == 2
assert (
service_calls[1].data["some"]
== f"time-{trigger_dt.day}-{trigger_dt.hour}-input_datetime.trigger"
)
@pytest.mark.parametrize(
("conf_at", "trigger_deltas"),
[
@ -654,10 +734,6 @@ def test_schema_valid(conf) -> None:
{"platform": "time", "at": "binary_sensor.bla"},
{"platform": "time", "at": 745},
{"platform": "time", "at": "25:00"},
{
"platform": "time",
"at": {"entity_id": "input_datetime.bla", "offset": "0:10"},
},
{"platform": "time", "at": {"entity_id": "13:00:00", "offset": "0:10"}},
],
)