Make async_track_time_change smarter (#17199)

* Make async_track_time_change smarter

* Move to util/dt

* Remove unnecessary check

* Lint

* Remove tzinfo check

* Remove check

* Add comment about async_track_point_in_utc_time

* Fix typing check

* Lint
This commit is contained in:
Otto Winter 2018-10-09 10:14:18 +02:00 committed by Paulus Schoutsen
parent 9190fe1c21
commit 26cf5acd5b
5 changed files with 502 additions and 119 deletions

View File

@ -322,13 +322,13 @@ track_sunset = threaded_listener_factory(async_track_sunset)
@callback
@bind_hass
def async_track_utc_time_change(hass, action, year=None, month=None, day=None,
def async_track_utc_time_change(hass, action,
hour=None, minute=None, second=None,
local=False):
"""Add a listener that will fire if time matches a pattern."""
# We do not have to wrap the function with time pattern matching logic
# if no pattern given
if all(val is None for val in (year, month, day, hour, minute, second)):
if all(val is None for val in (hour, minute, second)):
@callback
def time_change_listener(event):
"""Fire every time event that comes in."""
@ -336,24 +336,45 @@ def async_track_utc_time_change(hass, action, year=None, month=None, day=None,
return hass.bus.async_listen(EVENT_TIME_CHANGED, time_change_listener)
pmp = _process_time_match
year, month, day = pmp(year), pmp(month), pmp(day)
hour, minute, second = pmp(hour), pmp(minute), pmp(second)
matching_seconds = dt_util.parse_time_expression(second, 0, 59)
matching_minutes = dt_util.parse_time_expression(minute, 0, 59)
matching_hours = dt_util.parse_time_expression(hour, 0, 23)
next_time = None
def calculate_next(now):
"""Calculate and set the next time the trigger should fire."""
nonlocal next_time
localized_now = dt_util.as_local(now) if local else now
next_time = dt_util.find_next_time_expression_time(
localized_now, matching_seconds, matching_minutes,
matching_hours)
# Make sure rolling back the clock doesn't prevent the timer from
# triggering.
last_now = None
@callback
def pattern_time_change_listener(event):
"""Listen for matching time_changed events."""
nonlocal next_time, last_now
now = event.data[ATTR_NOW]
if local:
now = dt_util.as_local(now)
if last_now is None or now < last_now:
# Time rolled back or next time not yet calculated
calculate_next(now)
# pylint: disable=too-many-boolean-expressions
if second(now.second) and minute(now.minute) and hour(now.hour) and \
day(now.day) and month(now.month) and year(now.year):
last_now = now
hass.async_run_job(action, now)
if next_time <= now:
hass.async_run_job(action, event.data[ATTR_NOW])
calculate_next(now + timedelta(seconds=1))
# We can't use async_track_point_in_utc_time here because it would
# break in the case that the system time abruptly jumps backwards.
# Our custom last_now logic takes care of resolving that scenario.
return hass.bus.async_listen(EVENT_TIME_CHANGED,
pattern_time_change_listener)
@ -363,11 +384,10 @@ track_utc_time_change = threaded_listener_factory(async_track_utc_time_change)
@callback
@bind_hass
def async_track_time_change(hass, action, year=None, month=None, day=None,
hour=None, minute=None, second=None):
def async_track_time_change(hass, action, hour=None, minute=None, second=None):
"""Add a listener that will fire if UTC time matches a pattern."""
return async_track_utc_time_change(hass, action, year, month, day, hour,
minute, second, local=True)
return async_track_utc_time_change(hass, action, hour, minute, second,
local=True)
track_time_change = threaded_listener_factory(async_track_time_change)
@ -383,19 +403,3 @@ def _process_state_match(parameter):
parameter = tuple(parameter)
return lambda state: state in parameter
def _process_time_match(parameter):
"""Wrap parameter in a tuple if it is not one and returns it."""
if parameter is None or parameter == MATCH_ALL:
return lambda _: True
if isinstance(parameter, str) and parameter.startswith('/'):
parameter = float(parameter[1:])
return lambda time: time % parameter == 0
if isinstance(parameter, str) or not hasattr(parameter, '__iter__'):
return lambda time: time == parameter
parameter = tuple(parameter)
return lambda time: time in parameter

View File

@ -1,10 +1,14 @@
"""Helper methods to handle the time in Home Assistant."""
import datetime as dt
import re
from typing import Any, Dict, Union, Optional, Tuple # noqa pylint: disable=unused-import
from typing import (Any, Union, Optional, # noqa pylint: disable=unused-import
Tuple, List, cast, Dict)
import pytz
import pytz.exceptions as pytzexceptions
import pytz.tzinfo as pytzinfo # noqa pylint: disable=unused-import
from homeassistant.const import MATCH_ALL
DATE_STR_FORMAT = "%Y-%m-%d"
UTC = pytz.utc
@ -209,3 +213,162 @@ def get_age(date: dt.datetime) -> str:
return formatn(minute, 'minute')
return formatn(second, 'second')
def parse_time_expression(parameter: Any, min_value: int, max_value: int) \
-> List[int]:
"""Parse the time expression part and return a list of times to match."""
if parameter is None or parameter == MATCH_ALL:
res = [x for x in range(min_value, max_value + 1)]
elif isinstance(parameter, str) and parameter.startswith('/'):
parameter = float(parameter[1:])
res = [x for x in range(min_value, max_value + 1)
if x % parameter == 0]
elif not hasattr(parameter, '__iter__'):
res = [int(parameter)]
else:
res = list(sorted(int(x) for x in parameter))
for val in res:
if val < min_value or val > max_value:
raise ValueError(
"Time expression '{}': parameter {} out of range ({} to {})"
"".format(parameter, val, min_value, max_value)
)
return res
# pylint: disable=redefined-outer-name
def find_next_time_expression_time(now: dt.datetime,
seconds: List[int], minutes: List[int],
hours: List[int]) -> dt.datetime:
"""Find the next datetime from now for which the time expression matches.
The algorithm looks at each time unit separately and tries to find the
next one that matches for each. If any of them would roll over, all
time units below that are reset to the first matching value.
Timezones are also handled (the tzinfo of the now object is used),
including daylight saving time.
"""
if not seconds or not minutes or not hours:
raise ValueError("Cannot find a next time: Time expression never "
"matches!")
def _lower_bound(arr: List[int], cmp: int) -> Optional[int]:
"""Return the first value in arr greater or equal to cmp.
Return None if no such value exists.
"""
left = 0
right = len(arr)
while left < right:
mid = (left + right) // 2
if arr[mid] < cmp:
left = mid + 1
else:
right = mid
if left == len(arr):
return None
return arr[left]
result = now.replace(microsecond=0)
# Match next second
next_second = _lower_bound(seconds, result.second)
if next_second is None:
# No second to match in this minute. Roll-over to next minute.
next_second = seconds[0]
result += dt.timedelta(minutes=1)
result = result.replace(second=next_second)
# Match next minute
next_minute = _lower_bound(minutes, result.minute)
if next_minute != result.minute:
# We're in the next minute. Seconds needs to be reset.
result = result.replace(second=seconds[0])
if next_minute is None:
# No minute to match in this hour. Roll-over to next hour.
next_minute = minutes[0]
result += dt.timedelta(hours=1)
result = result.replace(minute=next_minute)
# Match next hour
next_hour = _lower_bound(hours, result.hour)
if next_hour != result.hour:
# We're in the next hour. Seconds+minutes needs to be reset.
result.replace(second=seconds[0], minute=minutes[0])
if next_hour is None:
# No minute to match in this day. Roll-over to next day.
next_hour = hours[0]
result += dt.timedelta(days=1)
result = result.replace(hour=next_hour)
if result.tzinfo is None:
return result
# Now we need to handle timezones. We will make this datetime object
# "naive" first and then re-convert it to the target timezone.
# This is so that we can call pytz's localize and handle DST changes.
tzinfo = result.tzinfo # type: pytzinfo.DstTzInfo
result = result.replace(tzinfo=None)
try:
result = tzinfo.localize(result, is_dst=None)
except pytzexceptions.AmbiguousTimeError:
# This happens when we're leaving daylight saving time and local
# clocks are rolled back. In this case, we want to trigger
# on both the DST and non-DST time. So when "now" is in the DST
# use the DST-on time, and if not, use the DST-off time.
use_dst = bool(now.dst())
result = tzinfo.localize(result, is_dst=use_dst)
except pytzexceptions.NonExistentTimeError:
# This happens when we're entering daylight saving time and local
# clocks are rolled forward, thus there are local times that do
# not exist. In this case, we want to trigger on the next time
# that *does* exist.
# In the worst case, this will run through all the seconds in the
# time shift, but that's max 3600 operations for once per year
result = result.replace(tzinfo=tzinfo) + dt.timedelta(seconds=1)
return find_next_time_expression_time(result, seconds, minutes, hours)
result_dst = cast(dt.timedelta, result.dst())
now_dst = cast(dt.timedelta, now.dst())
if result_dst >= now_dst:
return result
# Another edge-case when leaving DST:
# When now is in DST and ambiguous *and* the next trigger time we *should*
# trigger is ambiguous and outside DST, the excepts above won't catch it.
# For example: if triggering on 2:30 and now is 28.10.2018 2:30 (in DST)
# we should trigger next on 28.10.2018 2:30 (out of DST), but our
# algorithm above would produce 29.10.2018 2:30 (out of DST)
# Step 1: Check if now is ambiguous
try:
tzinfo.localize(now.replace(tzinfo=None), is_dst=None)
return result
except pytzexceptions.AmbiguousTimeError:
pass
# Step 2: Check if result of (now - DST) is ambiguous.
check = now - now_dst
check_result = find_next_time_expression_time(
check, seconds, minutes, hours)
try:
tzinfo.localize(check_result.replace(tzinfo=None), is_dst=None)
return result
except pytzexceptions.AmbiguousTimeError:
pass
# OK, edge case does apply. We must override the DST to DST-off
check_result = tzinfo.localize(check_result.replace(tzinfo=None),
is_dst=False)
return check_result

View File

@ -251,7 +251,7 @@ fire_mqtt_message = threadsafe_callback_factory(async_fire_mqtt_message)
@ha.callback
def async_fire_time_changed(hass, time):
"""Fire a time changes event."""
hass.bus.async_fire(EVENT_TIME_CHANGED, {'now': time})
hass.bus.async_fire(EVENT_TIME_CHANGED, {'now': date_util.as_utc(time)})
fire_time_changed = threadsafe_callback_factory(async_fire_time_changed)

View File

@ -85,38 +85,6 @@ class TestEventHelpers(unittest.TestCase):
self.hass.block_till_done()
self.assertEqual(2, len(runs))
def test_track_time_change(self):
"""Test tracking time change."""
wildcard_runs = []
specific_runs = []
unsub = track_time_change(self.hass, lambda x: wildcard_runs.append(1))
unsub_utc = track_utc_time_change(
self.hass, lambda x: specific_runs.append(1), second=[0, 30])
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 0))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(1, len(wildcard_runs))
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 15))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(2, len(wildcard_runs))
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 30))
self.hass.block_till_done()
self.assertEqual(2, len(specific_runs))
self.assertEqual(3, len(wildcard_runs))
unsub()
unsub_utc()
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 30))
self.hass.block_till_done()
self.assertEqual(2, len(specific_runs))
self.assertEqual(3, len(wildcard_runs))
def test_track_state_change(self):
"""Test track_state_change."""
# 2 lists to track how often our callbacks get called
@ -526,12 +494,64 @@ class TestEventHelpers(unittest.TestCase):
"""Send a time changed event."""
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: now})
class TestTrackTimeChange(unittest.TestCase):
"""Test track time change methods."""
def setUp(self):
"""Set up the tests."""
self.orig_default_time_zone = dt_util.DEFAULT_TIME_ZONE
self.hass = get_test_home_assistant()
def tearDown(self):
"""Stop everything that was started."""
dt_util.set_default_time_zone(self.orig_default_time_zone)
self.hass.stop()
def _send_time_changed(self, now):
"""Send a time changed event."""
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: now})
def test_track_time_change(self):
"""Test tracking time change."""
wildcard_runs = []
specific_runs = []
unsub = track_time_change(self.hass,
lambda x: wildcard_runs.append(1))
unsub_utc = track_utc_time_change(
self.hass, lambda x: specific_runs.append(1), second=[0, 30])
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 0))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(1, len(wildcard_runs))
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 15))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self.assertEqual(2, len(wildcard_runs))
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 30))
self.hass.block_till_done()
self.assertEqual(2, len(specific_runs))
self.assertEqual(3, len(wildcard_runs))
unsub()
unsub_utc()
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 30))
self.hass.block_till_done()
self.assertEqual(2, len(specific_runs))
self.assertEqual(3, len(wildcard_runs))
def test_periodic_task_minute(self):
"""Test periodic tasks per minute."""
specific_runs = []
unsub = track_utc_time_change(
self.hass, lambda x: specific_runs.append(1), minute='/5')
self.hass, lambda x: specific_runs.append(1), minute='/5',
second=0)
self._send_time_changed(datetime(2014, 5, 24, 12, 0, 0))
self.hass.block_till_done()
@ -556,7 +576,8 @@ class TestEventHelpers(unittest.TestCase):
specific_runs = []
unsub = track_utc_time_change(
self.hass, lambda x: specific_runs.append(1), hour='/2')
self.hass, lambda x: specific_runs.append(1), hour='/2',
minute=0, second=0)
self._send_time_changed(datetime(2014, 5, 24, 22, 0, 0))
self.hass.block_till_done()
@ -566,7 +587,7 @@ class TestEventHelpers(unittest.TestCase):
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self._send_time_changed(datetime(2014, 5, 24, 0, 0, 0))
self._send_time_changed(datetime(2014, 5, 25, 0, 0, 0))
self.hass.block_till_done()
self.assertEqual(2, len(specific_runs))
@ -584,68 +605,138 @@ class TestEventHelpers(unittest.TestCase):
self.hass.block_till_done()
self.assertEqual(3, len(specific_runs))
def test_periodic_task_day(self):
"""Test periodic tasks per day."""
specific_runs = []
unsub = track_utc_time_change(
self.hass, lambda x: specific_runs.append(1), day='/2')
self._send_time_changed(datetime(2014, 5, 2, 0, 0, 0))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self._send_time_changed(datetime(2014, 5, 3, 12, 0, 0))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self._send_time_changed(datetime(2014, 5, 4, 0, 0, 0))
self.hass.block_till_done()
self.assertEqual(2, len(specific_runs))
unsub()
self._send_time_changed(datetime(2014, 5, 4, 0, 0, 0))
self.hass.block_till_done()
self.assertEqual(2, len(specific_runs))
def test_periodic_task_year(self):
"""Test periodic tasks per year."""
specific_runs = []
unsub = track_utc_time_change(
self.hass, lambda x: specific_runs.append(1), year='/2')
self._send_time_changed(datetime(2014, 5, 2, 0, 0, 0))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self._send_time_changed(datetime(2015, 5, 2, 0, 0, 0))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self._send_time_changed(datetime(2016, 5, 2, 0, 0, 0))
self.hass.block_till_done()
self.assertEqual(2, len(specific_runs))
unsub()
self._send_time_changed(datetime(2016, 5, 2, 0, 0, 0))
self.hass.block_till_done()
self.assertEqual(2, len(specific_runs))
def test_periodic_task_wrong_input(self):
"""Test periodic tasks with wrong input."""
specific_runs = []
with pytest.raises(ValueError):
track_utc_time_change(
self.hass, lambda x: specific_runs.append(1), year='/two')
self.hass, lambda x: specific_runs.append(1), hour='/two')
self._send_time_changed(datetime(2014, 5, 2, 0, 0, 0))
self.hass.block_till_done()
self.assertEqual(0, len(specific_runs))
def test_periodic_task_clock_rollback(self):
"""Test periodic tasks with the time rolling backwards."""
specific_runs = []
unsub = track_utc_time_change(
self.hass, lambda x: specific_runs.append(1), hour='/2', minute=0,
second=0)
self._send_time_changed(datetime(2014, 5, 24, 22, 0, 0))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self._send_time_changed(datetime(2014, 5, 24, 23, 0, 0))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self._send_time_changed(datetime(2014, 5, 24, 22, 0, 0))
self.hass.block_till_done()
self.assertEqual(2, len(specific_runs))
self._send_time_changed(datetime(2014, 5, 24, 0, 0, 0))
self.hass.block_till_done()
self.assertEqual(3, len(specific_runs))
self._send_time_changed(datetime(2014, 5, 25, 2, 0, 0))
self.hass.block_till_done()
self.assertEqual(4, len(specific_runs))
unsub()
self._send_time_changed(datetime(2014, 5, 25, 2, 0, 0))
self.hass.block_till_done()
self.assertEqual(4, len(specific_runs))
def test_periodic_task_duplicate_time(self):
"""Test periodic tasks not triggering on duplicate time."""
specific_runs = []
unsub = track_utc_time_change(
self.hass, lambda x: specific_runs.append(1), hour='/2', minute=0,
second=0)
self._send_time_changed(datetime(2014, 5, 24, 22, 0, 0))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self._send_time_changed(datetime(2014, 5, 24, 22, 0, 0))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self._send_time_changed(datetime(2014, 5, 25, 0, 0, 0))
self.hass.block_till_done()
self.assertEqual(2, len(specific_runs))
unsub()
def test_periodic_task_entering_dst(self):
"""Test periodic task behavior when entering dst."""
tz = dt_util.get_time_zone('Europe/Vienna')
dt_util.set_default_time_zone(tz)
specific_runs = []
unsub = track_time_change(
self.hass, lambda x: specific_runs.append(1), hour=2, minute=30,
second=0)
self._send_time_changed(
tz.localize(datetime(2018, 3, 25, 1, 50, 0)))
self.hass.block_till_done()
self.assertEqual(0, len(specific_runs))
self._send_time_changed(
tz.localize(datetime(2018, 3, 25, 3, 50, 0)))
self.hass.block_till_done()
self.assertEqual(0, len(specific_runs))
self._send_time_changed(
tz.localize(datetime(2018, 3, 26, 1, 50, 0)))
self.hass.block_till_done()
self.assertEqual(0, len(specific_runs))
self._send_time_changed(
tz.localize(datetime(2018, 3, 26, 2, 50, 0)))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
unsub()
def test_periodic_task_leaving_dst(self):
"""Test periodic task behavior when leaving dst."""
tz = dt_util.get_time_zone('Europe/Vienna')
dt_util.set_default_time_zone(tz)
specific_runs = []
unsub = track_time_change(
self.hass, lambda x: specific_runs.append(1), hour=2, minute=30,
second=0)
self._send_time_changed(
tz.localize(datetime(2018, 10, 28, 2, 5, 0), is_dst=False))
self.hass.block_till_done()
self.assertEqual(0, len(specific_runs))
self._send_time_changed(
tz.localize(datetime(2018, 10, 28, 2, 55, 0), is_dst=False))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self._send_time_changed(
tz.localize(datetime(2018, 10, 28, 2, 5, 0), is_dst=True))
self.hass.block_till_done()
self.assertEqual(1, len(specific_runs))
self._send_time_changed(
tz.localize(datetime(2018, 10, 28, 2, 55, 0), is_dst=True))
self.hass.block_till_done()
self.assertEqual(2, len(specific_runs))
unsub()
def test_call_later(self):
"""Test calling an action later."""
def action(): pass

View File

@ -164,3 +164,128 @@ class TestDateUtil(unittest.TestCase):
diff = dt_util.now() - timedelta(minutes=365*60*24)
self.assertEqual(dt_util.get_age(diff), "1 year")
def test_parse_time_expression(self):
"""Test parse_time_expression."""
self.assertEqual(
[x for x in range(60)],
dt_util.parse_time_expression('*', 0, 59)
)
self.assertEqual(
[x for x in range(60)],
dt_util.parse_time_expression(None, 0, 59)
)
self.assertEqual(
[x for x in range(0, 60, 5)],
dt_util.parse_time_expression('/5', 0, 59)
)
self.assertEqual(
[1, 2, 3],
dt_util.parse_time_expression([2, 1, 3], 0, 59)
)
self.assertEqual(
[x for x in range(24)],
dt_util.parse_time_expression('*', 0, 23)
)
self.assertEqual(
[42],
dt_util.parse_time_expression(42, 0, 59)
)
self.assertRaises(ValueError, dt_util.parse_time_expression, 61, 0, 60)
def test_find_next_time_expression_time_basic(self):
"""Test basic stuff for find_next_time_expression_time."""
def find(dt, hour, minute, second):
"""Call test_find_next_time_expression_time."""
seconds = dt_util.parse_time_expression(second, 0, 59)
minutes = dt_util.parse_time_expression(minute, 0, 59)
hours = dt_util.parse_time_expression(hour, 0, 23)
return dt_util.find_next_time_expression_time(
dt, seconds, minutes, hours)
self.assertEqual(
datetime(2018, 10, 7, 10, 30, 0),
find(datetime(2018, 10, 7, 10, 20, 0), '*', '/30', 0)
)
self.assertEqual(
datetime(2018, 10, 7, 10, 30, 0),
find(datetime(2018, 10, 7, 10, 30, 0), '*', '/30', 0)
)
self.assertEqual(
datetime(2018, 10, 7, 12, 30, 30),
find(datetime(2018, 10, 7, 10, 30, 0), '/3', '/30', [30, 45])
)
self.assertEqual(
datetime(2018, 10, 8, 5, 0, 0),
find(datetime(2018, 10, 7, 10, 30, 0), 5, 0, 0)
)
def test_find_next_time_expression_time_dst(self):
"""Test daylight saving time for find_next_time_expression_time."""
tz = dt_util.get_time_zone('Europe/Vienna')
dt_util.set_default_time_zone(tz)
def find(dt, hour, minute, second):
"""Call test_find_next_time_expression_time."""
seconds = dt_util.parse_time_expression(second, 0, 59)
minutes = dt_util.parse_time_expression(minute, 0, 59)
hours = dt_util.parse_time_expression(hour, 0, 23)
return dt_util.find_next_time_expression_time(
dt, seconds, minutes, hours)
# Entering DST, clocks are rolled forward
self.assertEqual(
tz.localize(datetime(2018, 3, 26, 2, 30, 0)),
find(tz.localize(datetime(2018, 3, 25, 1, 50, 0)), 2, 30, 0)
)
self.assertEqual(
tz.localize(datetime(2018, 3, 26, 2, 30, 0)),
find(tz.localize(datetime(2018, 3, 25, 3, 50, 0)), 2, 30, 0)
)
self.assertEqual(
tz.localize(datetime(2018, 3, 26, 2, 30, 0)),
find(tz.localize(datetime(2018, 3, 26, 1, 50, 0)), 2, 30, 0)
)
# Leaving DST, clocks are rolled back
self.assertEqual(
tz.localize(datetime(2018, 10, 28, 2, 30, 0), is_dst=False),
find(tz.localize(datetime(2018, 10, 28, 2, 5, 0), is_dst=False),
2, 30, 0)
)
self.assertEqual(
tz.localize(datetime(2018, 10, 28, 2, 30, 0), is_dst=False),
find(tz.localize(datetime(2018, 10, 28, 2, 55, 0), is_dst=True),
2, 30, 0)
)
self.assertEqual(
tz.localize(datetime(2018, 10, 28, 4, 30, 0), is_dst=False),
find(tz.localize(datetime(2018, 10, 28, 2, 55, 0), is_dst=True),
4, 30, 0)
)
self.assertEqual(
tz.localize(datetime(2018, 10, 28, 2, 30, 0), is_dst=True),
find(tz.localize(datetime(2018, 10, 28, 2, 5, 0), is_dst=True),
2, 30, 0)
)
self.assertEqual(
tz.localize(datetime(2018, 10, 29, 2, 30, 0)),
find(tz.localize(datetime(2018, 10, 28, 2, 55, 0), is_dst=False),
2, 30, 0)
)