Merge pull request #58905 from home-assistant/2021.10.7

This commit is contained in:
Paulus Schoutsen 2021-11-01 10:51:04 -07:00 committed by GitHub
commit 82b6bbda76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 613 additions and 85 deletions

View File

@ -39,7 +39,7 @@ class DlnaDmrData:
"""Initialize global data."""
self.lock = asyncio.Lock()
session = aiohttp_client.async_get_clientsession(hass, verify_ssl=False)
self.requester = AiohttpSessionRequester(session, with_sleep=False)
self.requester = AiohttpSessionRequester(session, with_sleep=True)
self.upnp_factory = UpnpFactory(self.requester, non_strict=True)
self.event_notifiers = {}
self.event_notifier_refs = defaultdict(int)

View File

@ -2,6 +2,7 @@
from __future__ import annotations
import logging
import socket
from typing import Any
from urllib.parse import ParseResult, urlparse
@ -85,8 +86,16 @@ class FritzBoxToolsFlowHandler(ConfigFlow, domain=DOMAIN):
async def async_check_configured_entry(self) -> ConfigEntry | None:
"""Check if entry is configured."""
current_host = await self.hass.async_add_executor_job(
socket.gethostbyname, self._host
)
for entry in self._async_current_entries(include_ignore=False):
if entry.data[CONF_HOST] == self._host:
entry_host = await self.hass.async_add_executor_job(
socket.gethostbyname, entry.data[CONF_HOST]
)
if entry_host == current_host:
return entry
return None

View File

@ -118,6 +118,7 @@ class KeeneticFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
host = urlparse(discovery_info[ssdp.ATTR_SSDP_LOCATION]).hostname
await self.async_set_unique_id(discovery_info[ssdp.ATTR_UPNP_UDN])
self._abort_if_unique_id_configured(updates={CONF_HOST: host})
self._async_abort_entries_match({CONF_HOST: host})

View File

@ -3,7 +3,7 @@
"name": "Mazda Connected Services",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/mazda",
"requirements": ["pymazda==0.2.1"],
"requirements": ["pymazda==0.2.2"],
"codeowners": ["@bdr99"],
"quality_scale": "platinum",
"iot_class": "cloud_polling"

View File

@ -142,7 +142,9 @@ class NetgearFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
updated_data[CONF_PORT] = DEFAULT_PORT
for model in MODELS_V2:
if discovery_info.get(ssdp.ATTR_UPNP_MODEL_NUMBER, "").startswith(model):
if discovery_info.get(ssdp.ATTR_UPNP_MODEL_NUMBER, "").startswith(
model
) or discovery_info.get(ssdp.ATTR_UPNP_MODEL_NAME, "").startswith(model):
updated_data[CONF_PORT] = ORBI_PORT
self.placeholders.update(updated_data)

View File

@ -199,6 +199,9 @@ class NetgearRouter:
ntg_devices = await self.async_get_attached_devices()
now = dt_util.utcnow()
if ntg_devices is None:
return
if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug("Netgear scan result: \n%s", ntg_devices)

View File

@ -245,6 +245,7 @@ class SensorTemplate(TemplateEntity, SensorEntity):
self._friendly_name_template = friendly_name_template
self._attr_name = None
# Try to render the name as it can influence the entity ID
if friendly_name_template:
friendly_name_template.hass = hass

View File

@ -1,7 +1,7 @@
{
"domain": "tuya",
"name": "Tuya",
"documentation": "https://github.com/tuya/tuya-home-assistant",
"documentation": "https://www.home-assistant.io/integrations/tuya",
"requirements": ["tuya-iot-py-sdk==0.5.0"],
"codeowners": ["@Tuya", "@zlinoliver", "@METISU"],
"config_flow": true,

View File

@ -309,7 +309,7 @@ class MusicCastMediaPlayer(MusicCastDeviceEntity, MediaPlayerEntity):
async def async_media_stop(self):
"""Send stop command."""
if self._is_netusb:
await self.coordinator.musiccast.netusb_pause()
await self.coordinator.musiccast.netusb_stop()
else:
raise HomeAssistantError(
"Service stop is not supported for non NetUSB sources."

View File

@ -5,7 +5,7 @@ from typing import Final
MAJOR_VERSION: Final = 2021
MINOR_VERSION: Final = 10
PATCH_VERSION: Final = "6"
PATCH_VERSION: Final = "7"
__short_version__: Final = f"{MAJOR_VERSION}.{MINOR_VERSION}"
__version__: Final = f"{__short_version__}.{PATCH_VERSION}"
REQUIRED_PYTHON_VER: Final[tuple[int, int, int]] = (3, 8, 0)

View File

@ -272,7 +272,8 @@ def find_next_time_expression_time(
return None
return arr[left]
result = now.replace(microsecond=0)
# Reset microseconds and fold; fold (for ambiguous DST times) will be handled later
result = now.replace(microsecond=0, fold=0)
# Match next second
if (next_second := _lower_bound(seconds, result.second)) is None:
@ -309,40 +310,58 @@ def find_next_time_expression_time(
result = result.replace(hour=next_hour)
if result.tzinfo in (None, UTC):
# Using UTC, no DST checking needed
return result
if _datetime_ambiguous(result):
# This happens when we're leaving daylight saving time and local
# clocks are rolled back. In this case, we want to trigger
# on both the DST and non-DST time. So when "now" is in the DST
# use the DST-on time, and if not, use the DST-off time.
fold = 1 if now.dst() else 0
if result.fold != fold:
result = result.replace(fold=fold)
if not _datetime_exists(result):
# This happens when we're entering daylight saving time and local
# clocks are rolled forward, thus there are local times that do
# not exist. In this case, we want to trigger on the next time
# that *does* exist.
# In the worst case, this will run through all the seconds in the
# time shift, but that's max 3600 operations for once per year
# When entering DST and clocks are turned forward.
# There are wall clock times that don't "exist" (an hour is skipped).
# -> trigger on the next time that 1. matches the pattern and 2. does exist
# for example:
# on 2021.03.28 02:00:00 in CET timezone clocks are turned forward an hour
# with pattern "02:30", don't run on 28 mar (such a wall time does not exist on this day)
# instead run at 02:30 the next day
# We solve this edge case by just iterating one second until the result exists
# (max. 3600 operations, which should be fine for an edge case that happens once a year)
return find_next_time_expression_time(
result + dt.timedelta(seconds=1), seconds, minutes, hours
)
# Another edge-case when leaving DST:
# When now is in DST and ambiguous *and* the next trigger time we *should*
# trigger is ambiguous and outside DST, the excepts above won't catch it.
# For example: if triggering on 2:30 and now is 28.10.2018 2:30 (in DST)
# we should trigger next on 28.10.2018 2:30 (out of DST), but our
# algorithm above would produce 29.10.2018 2:30 (out of DST)
if _datetime_ambiguous(now):
now_is_ambiguous = _datetime_ambiguous(now)
result_is_ambiguous = _datetime_ambiguous(result)
# When leaving DST and clocks are turned backward.
# Then there are wall clock times that are ambiguous i.e. exist with DST and without DST
# The logic above does not take into account if a given pattern matches _twice_
# in a day.
# Example: on 2021.10.31 02:00:00 in CET timezone clocks are turned backward an hour
if now_is_ambiguous and result_is_ambiguous:
# `now` and `result` are both ambiguous, so the next match happens
# _within_ the current fold.
# Examples:
# 1. 2021.10.31 02:00:00+02:00 with pattern 02:30 -> 2021.10.31 02:30:00+02:00
# 2. 2021.10.31 02:00:00+01:00 with pattern 02:30 -> 2021.10.31 02:30:00+01:00
return result.replace(fold=now.fold)
if now_is_ambiguous and now.fold == 0 and not result_is_ambiguous:
# `now` is in the first fold, but result is not ambiguous (meaning it no longer matches
# within the fold).
# -> Check if result matches in the next fold. If so, emit that match
# Turn back the time by the DST offset, effectively run the algorithm on the first fold
# If it matches on the first fold, that means it will also match on the second one.
# Example: 2021.10.31 02:45:00+02:00 with pattern 02:30 -> 2021.10.31 02:30:00+01:00
check_result = find_next_time_expression_time(
now + _dst_offset_diff(now), seconds, minutes, hours
)
if _datetime_ambiguous(check_result):
return check_result
return check_result.replace(fold=1)
return result

View File

@ -1619,7 +1619,7 @@ pymailgunner==1.4
pymata-express==1.19
# homeassistant.components.mazda
pymazda==0.2.1
pymazda==0.2.2
# homeassistant.components.mediaroom
pymediaroom==0.6.4.1

View File

@ -950,7 +950,7 @@ pymailgunner==1.4
pymata-express==1.19
# homeassistant.components.mazda
pymazda==0.2.1
pymazda==0.2.2
# homeassistant.components.melcloud
pymelcloud==2.5.4

View File

@ -41,6 +41,7 @@ ATTR_HOST = "host"
ATTR_NEW_SERIAL_NUMBER = "NewSerialNumber"
MOCK_HOST = "fake_host"
MOCK_IP = "192.168.178.1"
MOCK_SERIAL_NUMBER = "fake_serial_number"
MOCK_FIRMWARE_INFO = [True, "1.1.1"]
@ -51,7 +52,7 @@ MOCK_DEVICE_INFO = {
}
MOCK_IMPORT_CONFIG = {CONF_HOST: MOCK_HOST, CONF_USERNAME: "username"}
MOCK_SSDP_DATA = {
ATTR_SSDP_LOCATION: "https://fake_host:12345/test",
ATTR_SSDP_LOCATION: f"https://{MOCK_IP}:12345/test",
ATTR_UPNP_FRIENDLY_NAME: "fake_name",
ATTR_UPNP_UDN: "uuid:only-a-test",
}
@ -81,7 +82,10 @@ async def test_user(hass: HomeAssistant, fc_class_mock, mock_get_source_ip):
"requests.get"
) as mock_request_get, patch(
"requests.post"
) as mock_request_post:
) as mock_request_post, patch(
"homeassistant.components.fritz.config_flow.socket.gethostbyname",
return_value=MOCK_IP,
):
mock_request_get.return_value.status_code = 200
mock_request_get.return_value.content = MOCK_REQUEST
@ -129,7 +133,10 @@ async def test_user_already_configured(
"requests.get"
) as mock_request_get, patch(
"requests.post"
) as mock_request_post:
) as mock_request_post, patch(
"homeassistant.components.fritz.config_flow.socket.gethostbyname",
return_value=MOCK_IP,
):
mock_request_get.return_value.status_code = 200
mock_request_get.return_value.content = MOCK_REQUEST
@ -319,7 +326,10 @@ async def test_ssdp_already_configured(
with patch(
"homeassistant.components.fritz.common.FritzConnection",
side_effect=fc_class_mock,
), patch("homeassistant.components.fritz.common.FritzStatus"):
), patch("homeassistant.components.fritz.common.FritzStatus"), patch(
"homeassistant.components.fritz.config_flow.socket.gethostbyname",
return_value=MOCK_IP,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_SSDP}, data=MOCK_SSDP_DATA
@ -343,7 +353,10 @@ async def test_ssdp_already_configured_host(
with patch(
"homeassistant.components.fritz.common.FritzConnection",
side_effect=fc_class_mock,
), patch("homeassistant.components.fritz.common.FritzStatus"):
), patch("homeassistant.components.fritz.common.FritzStatus"), patch(
"homeassistant.components.fritz.config_flow.socket.gethostbyname",
return_value=MOCK_IP,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_SSDP}, data=MOCK_SSDP_DATA
@ -367,7 +380,10 @@ async def test_ssdp_already_configured_host_uuid(
with patch(
"homeassistant.components.fritz.common.FritzConnection",
side_effect=fc_class_mock,
), patch("homeassistant.components.fritz.common.FritzStatus"):
), patch("homeassistant.components.fritz.common.FritzStatus"), patch(
"homeassistant.components.fritz.config_flow.socket.gethostbyname",
return_value=MOCK_IP,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_SSDP}, data=MOCK_SSDP_DATA
@ -436,7 +452,7 @@ async def test_ssdp(hass: HomeAssistant, fc_class_mock, mock_get_source_ip):
)
assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert result["data"][CONF_HOST] == "fake_host"
assert result["data"][CONF_HOST] == MOCK_IP
assert result["data"][CONF_PASSWORD] == "fake_pass"
assert result["data"][CONF_USERNAME] == "fake_user"
@ -482,7 +498,10 @@ async def test_import(hass: HomeAssistant, fc_class_mock, mock_get_source_ip):
"requests.get"
) as mock_request_get, patch(
"requests.post"
) as mock_request_post:
) as mock_request_post, patch(
"homeassistant.components.fritz.config_flow.socket.gethostbyname",
return_value=MOCK_IP,
):
mock_request_get.return_value.status_code = 200
mock_request_get.return_value.content = MOCK_REQUEST

View File

@ -211,6 +211,56 @@ async def test_ssdp_already_configured(hass: HomeAssistant) -> None:
assert result["reason"] == "already_configured"
async def test_ssdp_ignored(hass: HomeAssistant) -> None:
"""Test unique ID ignored and discovered."""
entry = MockConfigEntry(
domain=keenetic.DOMAIN,
source=config_entries.SOURCE_IGNORE,
unique_id=MOCK_SSDP_DISCOVERY_INFO[ssdp.ATTR_UPNP_UDN],
)
entry.add_to_hass(hass)
discovery_info = MOCK_SSDP_DISCOVERY_INFO.copy()
result = await hass.config_entries.flow.async_init(
keenetic.DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_SSDP},
data=discovery_info,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
async def test_ssdp_update_host(hass: HomeAssistant) -> None:
"""Test unique ID configured and discovered with the new host."""
entry = MockConfigEntry(
domain=keenetic.DOMAIN,
data=MOCK_DATA,
options=MOCK_OPTIONS,
unique_id=MOCK_SSDP_DISCOVERY_INFO[ssdp.ATTR_UPNP_UDN],
)
entry.add_to_hass(hass)
new_ip = "10.10.10.10"
discovery_info = {
**MOCK_SSDP_DISCOVERY_INFO,
ssdp.ATTR_SSDP_LOCATION: f"http://{new_ip}/",
}
result = await hass.config_entries.flow.async_init(
keenetic.DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_SSDP},
data=discovery_info,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
assert entry.data[CONF_HOST] == new_ip
async def test_ssdp_reject_no_udn(hass: HomeAssistant) -> None:
"""Discovered device has no UDN."""

View File

@ -115,7 +115,7 @@ async def test_entity_picture_template(hass, start_ha):
@pytest.mark.parametrize("count,domain", [(1, sensor.DOMAIN)])
@pytest.mark.parametrize(
"attribute,config",
"attribute,config,expected",
[
(
"friendly_name",
@ -130,6 +130,22 @@ async def test_entity_picture_template(hass, start_ha):
},
},
},
("It .", "It Works."),
),
(
"friendly_name",
{
"sensor": {
"platform": "template",
"sensors": {
"test_template_sensor": {
"value_template": "{{ states.sensor.test_state.state }}",
"friendly_name_template": "{{ 'It ' + states.sensor.test_state.state + '.'}}",
}
},
},
},
(None, "It Works."),
),
(
"friendly_name",
@ -144,6 +160,7 @@ async def test_entity_picture_template(hass, start_ha):
},
},
},
("It .", "It Works."),
),
(
"test_attribute",
@ -160,16 +177,17 @@ async def test_entity_picture_template(hass, start_ha):
},
},
},
("It .", "It Works."),
),
],
)
async def test_friendly_name_template(hass, attribute, start_ha):
async def test_friendly_name_template(hass, attribute, expected, start_ha):
"""Test friendly_name template with an unknown value_template."""
assert hass.states.get(TEST_NAME).attributes.get(attribute) == "It ."
assert hass.states.get(TEST_NAME).attributes.get(attribute) == expected[0]
hass.states.async_set("sensor.test_state", "Works")
await hass.async_block_till_done()
assert hass.states.get(TEST_NAME).attributes[attribute] == "It Works."
assert hass.states.get(TEST_NAME).attributes[attribute] == expected[1]
@pytest.mark.parametrize("count,domain", [(0, sensor.DOMAIN)])

View File

@ -2907,9 +2907,19 @@ async def test_periodic_task_entering_dst(hass):
dt_util.set_default_time_zone(timezone)
specific_runs = []
now = dt_util.utcnow()
# DST starts early morning March 27th 2022
yy = 2022
mm = 3
dd = 27
# There's no 2022-03-27 02:30, the event should not fire until 2022-03-28 02:30
time_that_will_not_match_right_away = datetime(
now.year + 1, 3, 25, 2, 31, 0, tzinfo=timezone
yy, mm, dd, 1, 28, 0, tzinfo=timezone, fold=0
)
# Make sure we enter DST during the test
assert (
time_that_will_not_match_right_away.utcoffset()
!= (time_that_will_not_match_right_away + timedelta(hours=2)).utcoffset()
)
with patch(
@ -2924,25 +2934,25 @@ async def test_periodic_task_entering_dst(hass):
)
async_fire_time_changed(
hass, datetime(now.year + 1, 3, 25, 1, 50, 0, 999999, tzinfo=timezone)
hass, datetime(yy, mm, dd, 1, 50, 0, 999999, tzinfo=timezone)
)
await hass.async_block_till_done()
assert len(specific_runs) == 0
async_fire_time_changed(
hass, datetime(now.year + 1, 3, 25, 3, 50, 0, 999999, tzinfo=timezone)
hass, datetime(yy, mm, dd, 3, 50, 0, 999999, tzinfo=timezone)
)
await hass.async_block_till_done()
assert len(specific_runs) == 0
async_fire_time_changed(
hass, datetime(now.year + 1, 3, 26, 1, 50, 0, 999999, tzinfo=timezone)
hass, datetime(yy, mm, dd + 1, 1, 50, 0, 999999, tzinfo=timezone)
)
await hass.async_block_till_done()
assert len(specific_runs) == 0
async_fire_time_changed(
hass, datetime(now.year + 1, 3, 26, 2, 50, 0, 999999, tzinfo=timezone)
hass, datetime(yy, mm, dd + 1, 2, 50, 0, 999999, tzinfo=timezone)
)
await hass.async_block_till_done()
assert len(specific_runs) == 1
@ -2956,10 +2966,19 @@ async def test_periodic_task_leaving_dst(hass):
dt_util.set_default_time_zone(timezone)
specific_runs = []
now = dt_util.utcnow()
# DST ends early morning Ocotber 30th 2022
yy = 2022
mm = 10
dd = 30
time_that_will_not_match_right_away = datetime(
now.year + 1, 10, 28, 2, 28, 0, tzinfo=timezone, fold=1
yy, mm, dd, 2, 28, 0, tzinfo=timezone, fold=0
)
# Make sure we leave DST during the test
assert (
time_that_will_not_match_right_away.utcoffset()
!= time_that_will_not_match_right_away.replace(fold=1).utcoffset()
)
with patch(
@ -2973,38 +2992,134 @@ async def test_periodic_task_leaving_dst(hass):
second=0,
)
# The task should not fire yet
async_fire_time_changed(
hass, datetime(now.year + 1, 10, 28, 2, 5, 0, 999999, tzinfo=timezone, fold=0)
hass, datetime(yy, mm, dd, 2, 28, 0, 999999, tzinfo=timezone, fold=0)
)
await hass.async_block_till_done()
assert len(specific_runs) == 0
# The task should fire
async_fire_time_changed(
hass, datetime(now.year + 1, 10, 28, 2, 55, 0, 999999, tzinfo=timezone, fold=0)
hass, datetime(yy, mm, dd, 2, 30, 0, 999999, tzinfo=timezone, fold=0)
)
await hass.async_block_till_done()
assert len(specific_runs) == 1
# The task should not fire again
async_fire_time_changed(
hass, datetime(yy, mm, dd, 2, 55, 0, 999999, tzinfo=timezone, fold=0)
)
await hass.async_block_till_done()
assert len(specific_runs) == 1
# DST has ended, the task should not fire yet
async_fire_time_changed(
hass,
datetime(now.year + 2, 10, 28, 2, 45, 0, 999999, tzinfo=timezone, fold=1),
datetime(yy, mm, dd, 2, 15, 0, 999999, tzinfo=timezone, fold=1),
)
await hass.async_block_till_done()
assert len(specific_runs) == 1
# The task should fire
async_fire_time_changed(
hass,
datetime(yy, mm, dd, 2, 45, 0, 999999, tzinfo=timezone, fold=1),
)
await hass.async_block_till_done()
assert len(specific_runs) == 2
# The task should not fire again
async_fire_time_changed(
hass,
datetime(now.year + 2, 10, 28, 2, 55, 0, 999999, tzinfo=timezone, fold=1),
datetime(yy, mm, dd, 2, 55, 0, 999999, tzinfo=timezone, fold=1),
)
await hass.async_block_till_done()
assert len(specific_runs) == 2
# The task should fire again the next day
async_fire_time_changed(
hass, datetime(now.year + 2, 10, 28, 2, 55, 0, 999999, tzinfo=timezone, fold=1)
hass, datetime(yy, mm, dd + 1, 2, 55, 0, 999999, tzinfo=timezone, fold=1)
)
await hass.async_block_till_done()
assert len(specific_runs) == 3
unsub()
async def test_periodic_task_leaving_dst_2(hass):
"""Test periodic task behavior when leaving dst."""
timezone = dt_util.get_time_zone("Europe/Vienna")
dt_util.set_default_time_zone(timezone)
specific_runs = []
# DST ends early morning Ocotber 30th 2022
yy = 2022
mm = 10
dd = 30
time_that_will_not_match_right_away = datetime(
yy, mm, dd, 2, 28, 0, tzinfo=timezone, fold=0
)
# Make sure we leave DST during the test
assert (
time_that_will_not_match_right_away.utcoffset()
!= time_that_will_not_match_right_away.replace(fold=1).utcoffset()
)
with patch(
"homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away
):
unsub = async_track_time_change(
hass,
callback(lambda x: specific_runs.append(x)),
minute=30,
second=0,
)
# The task should not fire yet
async_fire_time_changed(
hass, datetime(yy, mm, dd, 2, 28, 0, 999999, tzinfo=timezone, fold=0)
)
await hass.async_block_till_done()
assert len(specific_runs) == 0
# The task should fire
async_fire_time_changed(
hass, datetime(yy, mm, dd, 2, 55, 0, 999999, tzinfo=timezone, fold=0)
)
await hass.async_block_till_done()
assert len(specific_runs) == 1
# DST has ended, the task should not fire yet
async_fire_time_changed(
hass, datetime(yy, mm, dd, 2, 15, 0, 999999, tzinfo=timezone, fold=1)
)
await hass.async_block_till_done()
assert len(specific_runs) == 1
# The task should fire
async_fire_time_changed(
hass, datetime(yy, mm, dd, 2, 45, 0, 999999, tzinfo=timezone, fold=1)
)
await hass.async_block_till_done()
assert len(specific_runs) == 2
# The task should not fire again
async_fire_time_changed(
hass,
datetime(yy, mm, dd, 2, 55, 0, 999999, tzinfo=timezone, fold=1),
)
await hass.async_block_till_done()
assert len(specific_runs) == 2
# The task should fire again the next hour
async_fire_time_changed(
hass, datetime(yy, mm, dd, 3, 55, 0, 999999, tzinfo=timezone, fold=0)
)
await hass.async_block_till_done()
assert len(specific_runs) == 3
unsub()

View File

@ -224,120 +224,411 @@ def test_find_next_time_expression_time_dst():
tz = dt_util.get_time_zone("Europe/Vienna")
dt_util.set_default_time_zone(tz)
def find(dt, hour, minute, second):
def find(dt, hour, minute, second) -> datetime:
"""Call test_find_next_time_expression_time."""
seconds = dt_util.parse_time_expression(second, 0, 59)
minutes = dt_util.parse_time_expression(minute, 0, 59)
hours = dt_util.parse_time_expression(hour, 0, 23)
return dt_util.find_next_time_expression_time(dt, seconds, minutes, hours)
local = dt_util.find_next_time_expression_time(dt, seconds, minutes, hours)
return dt_util.as_utc(local)
# Entering DST, clocks are rolled forward
assert datetime(2018, 3, 26, 2, 30, 0, tzinfo=tz) == find(
assert dt_util.as_utc(datetime(2018, 3, 26, 2, 30, 0, tzinfo=tz)) == find(
datetime(2018, 3, 25, 1, 50, 0, tzinfo=tz), 2, 30, 0
)
assert datetime(2018, 3, 26, 2, 30, 0, tzinfo=tz) == find(
assert dt_util.as_utc(datetime(2018, 3, 26, 2, 30, 0, tzinfo=tz)) == find(
datetime(2018, 3, 25, 3, 50, 0, tzinfo=tz), 2, 30, 0
)
assert datetime(2018, 3, 26, 2, 30, 0, tzinfo=tz) == find(
assert dt_util.as_utc(datetime(2018, 3, 26, 2, 30, 0, tzinfo=tz)) == find(
datetime(2018, 3, 26, 1, 50, 0, tzinfo=tz), 2, 30, 0
)
# Leaving DST, clocks are rolled back
assert datetime(2018, 10, 28, 2, 30, 0, tzinfo=tz, fold=0) == find(
assert dt_util.as_utc(datetime(2018, 10, 28, 2, 30, 0, tzinfo=tz, fold=0)) == find(
datetime(2018, 10, 28, 2, 5, 0, tzinfo=tz, fold=0), 2, 30, 0
)
assert datetime(2018, 10, 28, 2, 30, 0, tzinfo=tz, fold=0) == find(
assert dt_util.as_utc(datetime(2018, 10, 28, 2, 30, 0, tzinfo=tz, fold=0)) == find(
datetime(2018, 10, 28, 2, 5, 0, tzinfo=tz), 2, 30, 0
)
assert datetime(2018, 10, 28, 2, 30, 0, tzinfo=tz, fold=1) == find(
assert dt_util.as_utc(datetime(2018, 10, 28, 2, 30, 0, tzinfo=tz, fold=1)) == find(
datetime(2018, 10, 28, 2, 55, 0, tzinfo=tz), 2, 30, 0
)
assert datetime(2018, 10, 28, 2, 30, 0, tzinfo=tz, fold=1) == find(
assert dt_util.as_utc(datetime(2018, 10, 28, 2, 30, 0, tzinfo=tz, fold=1)) == find(
datetime(2018, 10, 28, 2, 55, 0, tzinfo=tz, fold=0), 2, 30, 0
)
assert datetime(2018, 10, 28, 4, 30, 0, tzinfo=tz, fold=0) == find(
assert dt_util.as_utc(datetime(2018, 10, 28, 4, 30, 0, tzinfo=tz, fold=0)) == find(
datetime(2018, 10, 28, 2, 55, 0, tzinfo=tz, fold=1), 4, 30, 0
)
assert datetime(2018, 10, 28, 2, 30, 0, tzinfo=tz, fold=1) == find(
assert dt_util.as_utc(datetime(2018, 10, 28, 2, 30, 0, tzinfo=tz, fold=1)) == find(
datetime(2018, 10, 28, 2, 5, 0, tzinfo=tz, fold=1), 2, 30, 0
)
assert datetime(2018, 10, 28, 2, 30, 0, tzinfo=tz, fold=1) == find(
assert dt_util.as_utc(datetime(2018, 10, 28, 2, 30, 0, tzinfo=tz, fold=1)) == find(
datetime(2018, 10, 28, 2, 55, 0, tzinfo=tz, fold=0), 2, 30, 0
)
# DST begins on 2021.03.28 2:00, clocks were turned forward 1h; 2:00-3:00 time does not exist
@pytest.mark.parametrize(
"now_dt, expected_dt",
[
# 00:00 -> 2:30
(
datetime(2021, 3, 28, 0, 0, 0),
datetime(2021, 3, 29, 2, 30, 0),
),
],
)
def test_find_next_time_expression_entering_dst(now_dt, expected_dt):
"""Test entering daylight saving time for find_next_time_expression_time."""
tz = dt_util.get_time_zone("Europe/Vienna")
dt_util.set_default_time_zone(tz)
# match on 02:30:00 every day
pattern_seconds = dt_util.parse_time_expression(0, 0, 59)
pattern_minutes = dt_util.parse_time_expression(30, 0, 59)
pattern_hours = dt_util.parse_time_expression(2, 0, 59)
now_dt = now_dt.replace(tzinfo=tz)
expected_dt = expected_dt.replace(tzinfo=tz)
res_dt = dt_util.find_next_time_expression_time(
now_dt, pattern_seconds, pattern_minutes, pattern_hours
)
assert dt_util.as_utc(res_dt) == dt_util.as_utc(expected_dt)
# DST ends on 2021.10.31 2:00, clocks were turned backward 1h; 2:00-3:00 time is ambiguous
@pytest.mark.parametrize(
"now_dt, expected_dt",
[
# 00:00 -> 2:30
(
datetime(2021, 10, 31, 0, 0, 0),
datetime(2021, 10, 31, 2, 30, 0, fold=0),
),
# 02:00(0) -> 2:30(0)
(
datetime(2021, 10, 31, 2, 0, 0, fold=0),
datetime(2021, 10, 31, 2, 30, 0, fold=0),
),
# 02:15(0) -> 2:30(0)
(
datetime(2021, 10, 31, 2, 15, 0, fold=0),
datetime(2021, 10, 31, 2, 30, 0, fold=0),
),
# 02:30:00(0) -> 2:30(1)
(
datetime(2021, 10, 31, 2, 30, 0, fold=0),
datetime(2021, 10, 31, 2, 30, 0, fold=0),
),
# 02:30:01(0) -> 2:30(1)
(
datetime(2021, 10, 31, 2, 30, 1, fold=0),
datetime(2021, 10, 31, 2, 30, 0, fold=1),
),
# 02:45(0) -> 2:30(1)
(
datetime(2021, 10, 31, 2, 45, 0, fold=0),
datetime(2021, 10, 31, 2, 30, 0, fold=1),
),
# 02:00(1) -> 2:30(1)
(
datetime(2021, 10, 31, 2, 0, 0, fold=1),
datetime(2021, 10, 31, 2, 30, 0, fold=1),
),
# 02:15(1) -> 2:30(1)
(
datetime(2021, 10, 31, 2, 15, 0, fold=1),
datetime(2021, 10, 31, 2, 30, 0, fold=1),
),
# 02:30:00(1) -> 2:30(1)
(
datetime(2021, 10, 31, 2, 30, 0, fold=1),
datetime(2021, 10, 31, 2, 30, 0, fold=1),
),
# 02:30:01(1) -> 2:30 next day
(
datetime(2021, 10, 31, 2, 30, 1, fold=1),
datetime(2021, 11, 1, 2, 30, 0),
),
# 02:45(1) -> 2:30 next day
(
datetime(2021, 10, 31, 2, 45, 0, fold=1),
datetime(2021, 11, 1, 2, 30, 0),
),
# 08:00(1) -> 2:30 next day
(
datetime(2021, 10, 31, 8, 0, 1),
datetime(2021, 11, 1, 2, 30, 0),
),
],
)
def test_find_next_time_expression_exiting_dst(now_dt, expected_dt):
"""Test exiting daylight saving time for find_next_time_expression_time."""
tz = dt_util.get_time_zone("Europe/Vienna")
dt_util.set_default_time_zone(tz)
# match on 02:30:00 every day
pattern_seconds = dt_util.parse_time_expression(0, 0, 59)
pattern_minutes = dt_util.parse_time_expression(30, 0, 59)
pattern_hours = dt_util.parse_time_expression(2, 0, 59)
now_dt = now_dt.replace(tzinfo=tz)
expected_dt = expected_dt.replace(tzinfo=tz)
res_dt = dt_util.find_next_time_expression_time(
now_dt, pattern_seconds, pattern_minutes, pattern_hours
)
assert dt_util.as_utc(res_dt) == dt_util.as_utc(expected_dt)
def test_find_next_time_expression_time_dst_chicago():
"""Test daylight saving time for find_next_time_expression_time."""
tz = dt_util.get_time_zone("America/Chicago")
dt_util.set_default_time_zone(tz)
def find(dt, hour, minute, second):
def find(dt, hour, minute, second) -> datetime:
"""Call test_find_next_time_expression_time."""
seconds = dt_util.parse_time_expression(second, 0, 59)
minutes = dt_util.parse_time_expression(minute, 0, 59)
hours = dt_util.parse_time_expression(hour, 0, 23)
return dt_util.find_next_time_expression_time(dt, seconds, minutes, hours)
local = dt_util.find_next_time_expression_time(dt, seconds, minutes, hours)
return dt_util.as_utc(local)
# Entering DST, clocks are rolled forward
assert datetime(2021, 3, 15, 2, 30, 0, tzinfo=tz) == find(
assert dt_util.as_utc(datetime(2021, 3, 15, 2, 30, 0, tzinfo=tz)) == find(
datetime(2021, 3, 14, 1, 50, 0, tzinfo=tz), 2, 30, 0
)
assert datetime(2021, 3, 15, 2, 30, 0, tzinfo=tz) == find(
assert dt_util.as_utc(datetime(2021, 3, 15, 2, 30, 0, tzinfo=tz)) == find(
datetime(2021, 3, 14, 3, 50, 0, tzinfo=tz), 2, 30, 0
)
assert datetime(2021, 3, 15, 2, 30, 0, tzinfo=tz) == find(
assert dt_util.as_utc(datetime(2021, 3, 15, 2, 30, 0, tzinfo=tz)) == find(
datetime(2021, 3, 14, 1, 50, 0, tzinfo=tz), 2, 30, 0
)
assert datetime(2021, 3, 14, 3, 30, 0, tzinfo=tz) == find(
assert dt_util.as_utc(datetime(2021, 3, 14, 3, 30, 0, tzinfo=tz)) == find(
datetime(2021, 3, 14, 1, 50, 0, tzinfo=tz), 3, 30, 0
)
# Leaving DST, clocks are rolled back
assert datetime(2021, 11, 7, 2, 30, 0, tzinfo=tz, fold=0) == find(
assert dt_util.as_utc(datetime(2021, 11, 7, 2, 30, 0, tzinfo=tz, fold=0)) == find(
datetime(2021, 11, 7, 2, 5, 0, tzinfo=tz, fold=0), 2, 30, 0
)
assert datetime(2021, 11, 7, 2, 30, 0, tzinfo=tz) == find(
assert dt_util.as_utc(datetime(2021, 11, 7, 2, 30, 0, tzinfo=tz)) == find(
datetime(2021, 11, 7, 2, 5, 0, tzinfo=tz), 2, 30, 0
)
assert datetime(2021, 11, 7, 2, 30, 0, tzinfo=tz, fold=0) == find(
assert dt_util.as_utc(datetime(2021, 11, 7, 2, 30, 0, tzinfo=tz, fold=0)) == find(
datetime(2021, 11, 7, 2, 5, 0, tzinfo=tz), 2, 30, 0
)
assert datetime(2021, 11, 7, 2, 30, 0, tzinfo=tz, fold=1) == find(
assert dt_util.as_utc(datetime(2021, 11, 7, 2, 30, 0, tzinfo=tz, fold=1)) == find(
datetime(2021, 11, 7, 2, 10, 0, tzinfo=tz), 2, 30, 0
)
assert datetime(2021, 11, 7, 2, 30, 0, tzinfo=tz, fold=1) == find(
assert dt_util.as_utc(datetime(2021, 11, 7, 2, 30, 0, tzinfo=tz, fold=1)) == find(
datetime(2021, 11, 7, 2, 30, 0, tzinfo=tz, fold=0), 2, 30, 0
)
assert datetime(2021, 11, 8, 2, 30, 0, tzinfo=tz, fold=1) == find(
assert dt_util.as_utc(datetime(2021, 11, 8, 2, 30, 0, tzinfo=tz, fold=1)) == find(
datetime(2021, 11, 7, 2, 55, 0, tzinfo=tz, fold=0), 2, 30, 0
)
assert datetime(2021, 11, 7, 4, 30, 0, tzinfo=tz, fold=0) == find(
assert dt_util.as_utc(datetime(2021, 11, 7, 4, 30, 0, tzinfo=tz, fold=0)) == find(
datetime(2021, 11, 7, 2, 55, 0, tzinfo=tz, fold=1), 4, 30, 0
)
assert datetime(2021, 11, 7, 2, 30, 0, tzinfo=tz, fold=1) == find(
assert dt_util.as_utc(datetime(2021, 11, 7, 2, 30, 0, tzinfo=tz, fold=1)) == find(
datetime(2021, 11, 7, 2, 5, 0, tzinfo=tz, fold=1), 2, 30, 0
)
assert datetime(2021, 11, 8, 2, 30, 0, tzinfo=tz) == find(
assert dt_util.as_utc(datetime(2021, 11, 8, 2, 30, 0, tzinfo=tz)) == find(
datetime(2021, 11, 7, 2, 55, 0, tzinfo=tz, fold=0), 2, 30, 0
)
def _get_matches(hours, minutes, seconds):
matching_hours = dt_util.parse_time_expression(hours, 0, 23)
matching_minutes = dt_util.parse_time_expression(minutes, 0, 59)
matching_seconds = dt_util.parse_time_expression(seconds, 0, 59)
return matching_hours, matching_minutes, matching_seconds
def test_find_next_time_expression_day_before_dst_change_the_same_time():
"""Test the day before DST to establish behavior without DST."""
tz = dt_util.get_time_zone("America/Chicago")
dt_util.set_default_time_zone(tz)
# Not in DST yet
hour_minute_second = (12, 30, 1)
test_time = datetime(2021, 10, 7, *hour_minute_second, tzinfo=tz, fold=0)
matching_hours, matching_minutes, matching_seconds = _get_matches(
*hour_minute_second
)
next_time = dt_util.find_next_time_expression_time(
test_time, matching_seconds, matching_minutes, matching_hours
)
assert next_time == datetime(2021, 10, 7, *hour_minute_second, tzinfo=tz, fold=0)
assert next_time.fold == 0
assert dt_util.as_utc(next_time) == datetime(
2021, 10, 7, 17, 30, 1, tzinfo=dt_util.UTC
)
def test_find_next_time_expression_time_leave_dst_chicago_before_the_fold_30_s():
"""Test leaving daylight saving time for find_next_time_expression_time 30s into the future."""
tz = dt_util.get_time_zone("America/Chicago")
dt_util.set_default_time_zone(tz)
# Leaving DST, clocks are rolled back
# Move ahead 30 seconds not folded yet
hour_minute_second = (1, 30, 31)
test_time = datetime(2021, 11, 7, 1, 30, 1, tzinfo=tz, fold=0)
matching_hours, matching_minutes, matching_seconds = _get_matches(
*hour_minute_second
)
next_time = dt_util.find_next_time_expression_time(
test_time, matching_seconds, matching_minutes, matching_hours
)
assert next_time == datetime(2021, 11, 7, 1, 30, 31, tzinfo=tz, fold=0)
assert dt_util.as_utc(next_time) == datetime(
2021, 11, 7, 6, 30, 31, tzinfo=dt_util.UTC
)
assert next_time.fold == 0
def test_find_next_time_expression_time_leave_dst_chicago_before_the_fold_same_time():
"""Test leaving daylight saving time for find_next_time_expression_time with the same time."""
tz = dt_util.get_time_zone("America/Chicago")
dt_util.set_default_time_zone(tz)
# Leaving DST, clocks are rolled back
# Move to the same time not folded yet
hour_minute_second = (0, 30, 1)
test_time = datetime(2021, 11, 7, *hour_minute_second, tzinfo=tz, fold=0)
matching_hours, matching_minutes, matching_seconds = _get_matches(
*hour_minute_second
)
next_time = dt_util.find_next_time_expression_time(
test_time, matching_seconds, matching_minutes, matching_hours
)
assert next_time == datetime(2021, 11, 7, *hour_minute_second, tzinfo=tz, fold=0)
assert dt_util.as_utc(next_time) == datetime(
2021, 11, 7, 5, 30, 1, tzinfo=dt_util.UTC
)
assert next_time.fold == 0
def test_find_next_time_expression_time_leave_dst_chicago_into_the_fold_same_time():
"""Test leaving daylight saving time for find_next_time_expression_time."""
tz = dt_util.get_time_zone("America/Chicago")
dt_util.set_default_time_zone(tz)
# Leaving DST, clocks are rolled back
# Find the same time inside the fold
hour_minute_second = (1, 30, 1)
test_time = datetime(2021, 11, 7, *hour_minute_second, tzinfo=tz, fold=0)
matching_hours, matching_minutes, matching_seconds = _get_matches(
*hour_minute_second
)
next_time = dt_util.find_next_time_expression_time(
test_time, matching_seconds, matching_minutes, matching_hours
)
assert next_time == datetime(2021, 11, 7, *hour_minute_second, tzinfo=tz, fold=1)
assert next_time.fold == 0
assert dt_util.as_utc(next_time) == datetime(
2021, 11, 7, 6, 30, 1, tzinfo=dt_util.UTC
)
def test_find_next_time_expression_time_leave_dst_chicago_into_the_fold_ahead_1_hour_10_min():
"""Test leaving daylight saving time for find_next_time_expression_time."""
tz = dt_util.get_time_zone("America/Chicago")
dt_util.set_default_time_zone(tz)
# Leaving DST, clocks are rolled back
# Find 1h 10m after into the fold
# Start at 01:30:01 fold=0
# Reach to 01:20:01 fold=1
hour_minute_second = (1, 20, 1)
test_time = datetime(2021, 11, 7, 1, 30, 1, tzinfo=tz, fold=0)
matching_hours, matching_minutes, matching_seconds = _get_matches(
*hour_minute_second
)
next_time = dt_util.find_next_time_expression_time(
test_time, matching_seconds, matching_minutes, matching_hours
)
assert next_time == datetime(2021, 11, 7, *hour_minute_second, tzinfo=tz, fold=1)
assert next_time.fold == 1 # time is ambiguous
assert dt_util.as_utc(next_time) == datetime(
2021, 11, 7, 7, 20, 1, tzinfo=dt_util.UTC
)
def test_find_next_time_expression_time_leave_dst_chicago_inside_the_fold_ahead_10_min():
"""Test leaving daylight saving time for find_next_time_expression_time."""
tz = dt_util.get_time_zone("America/Chicago")
dt_util.set_default_time_zone(tz)
# Leaving DST, clocks are rolled back
# Find 10m later while we are in the fold
# Start at 01:30:01 fold=0
# Reach to 01:40:01 fold=1
hour_minute_second = (1, 40, 1)
test_time = datetime(2021, 11, 7, 1, 30, 1, tzinfo=tz, fold=1)
matching_hours, matching_minutes, matching_seconds = _get_matches(
*hour_minute_second
)
next_time = dt_util.find_next_time_expression_time(
test_time, matching_seconds, matching_minutes, matching_hours
)
assert next_time == datetime(2021, 11, 7, *hour_minute_second, tzinfo=tz, fold=1)
assert next_time.fold == 1 # time is ambiguous
assert dt_util.as_utc(next_time) == datetime(
2021, 11, 7, 7, 40, 1, tzinfo=dt_util.UTC
)
def test_find_next_time_expression_time_leave_dst_chicago_past_the_fold_ahead_2_hour_10_min():
"""Test leaving daylight saving time for find_next_time_expression_time."""
tz = dt_util.get_time_zone("America/Chicago")
dt_util.set_default_time_zone(tz)
# Leaving DST, clocks are rolled back
# Find 1h 10m after into the fold
# Start at 01:30:01 fold=0
# Reach to 02:20:01 past the fold
hour_minute_second = (2, 20, 1)
test_time = datetime(2021, 11, 7, 1, 30, 1, tzinfo=tz, fold=0)
matching_hours, matching_minutes, matching_seconds = _get_matches(
*hour_minute_second
)
next_time = dt_util.find_next_time_expression_time(
test_time, matching_seconds, matching_minutes, matching_hours
)
assert next_time == datetime(2021, 11, 7, *hour_minute_second, tzinfo=tz, fold=1)
assert next_time.fold == 0 # Time is no longer ambiguous
assert dt_util.as_utc(next_time) == datetime(
2021, 11, 7, 8, 20, 1, tzinfo=dt_util.UTC
)