Core track same state for a period / Allow on platforms (#9273)

* Core track state period / Allow on platforms

* Add tests

* fix lint

* fix tests

* add new tracker to automation state

* update schema

* fix bug

* revert validate string

* Fix bug

* Set arguments to async_check_funct

* add logic into numeric_state

* fix numeric_state

* Add tests

* fix retrigger state

* cleanup

* Add delay function to template binary_sensor

* Fix tests & lint

* add more tests

* fix lint

* Address comments

* fix test & lint
This commit is contained in:
Pascal Vizeli 2017-09-05 02:01:01 +02:00 committed by GitHub
parent 67828cb7a2
commit ed699896cb
8 changed files with 548 additions and 88 deletions

View File

@ -12,16 +12,18 @@ import voluptuous as vol
from homeassistant.core import callback
from homeassistant.const import (
CONF_VALUE_TEMPLATE, CONF_PLATFORM, CONF_ENTITY_ID,
CONF_BELOW, CONF_ABOVE)
from homeassistant.helpers.event import async_track_state_change
CONF_BELOW, CONF_ABOVE, CONF_FOR)
from homeassistant.helpers.event import (
async_track_state_change, async_track_same_state)
from homeassistant.helpers import condition, config_validation as cv
TRIGGER_SCHEMA = vol.All(vol.Schema({
vol.Required(CONF_PLATFORM): 'numeric_state',
vol.Required(CONF_ENTITY_ID): cv.entity_ids,
CONF_BELOW: vol.Coerce(float),
CONF_ABOVE: vol.Coerce(float),
vol.Optional(CONF_BELOW): vol.Coerce(float),
vol.Optional(CONF_ABOVE): vol.Coerce(float),
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
vol.Optional(CONF_FOR): vol.All(cv.time_period, cv.positive_timedelta),
}), cv.has_at_least_one_key(CONF_BELOW, CONF_ABOVE))
_LOGGER = logging.getLogger(__name__)
@ -33,15 +35,18 @@ def async_trigger(hass, config, action):
entity_id = config.get(CONF_ENTITY_ID)
below = config.get(CONF_BELOW)
above = config.get(CONF_ABOVE)
time_delta = config.get(CONF_FOR)
value_template = config.get(CONF_VALUE_TEMPLATE)
async_remove_track_same = None
if value_template is not None:
value_template.hass = hass
@callback
def state_automation_listener(entity, from_s, to_s):
"""Listen for state changes and calls action."""
def check_numeric_state(entity, from_s, to_s):
"""Return True if they should trigger."""
if to_s is None:
return
return False
variables = {
'trigger': {
@ -55,17 +60,56 @@ def async_trigger(hass, config, action):
# If new one doesn't match, nothing to do
if not condition.async_numeric_state(
hass, to_s, below, above, value_template, variables):
return False
return True
@callback
def state_automation_listener(entity, from_s, to_s):
"""Listen for state changes and calls action."""
nonlocal async_remove_track_same
if not check_numeric_state(entity, from_s, to_s):
return
variables = {
'trigger': {
'platform': 'numeric_state',
'entity_id': entity,
'below': below,
'above': above,
'from_state': from_s,
'to_state': to_s,
}
}
# Only match if old didn't exist or existed but didn't match
# Written as: skip if old one did exist and matched
if from_s is not None and condition.async_numeric_state(
hass, from_s, below, above, value_template, variables):
return
variables['trigger']['from_state'] = from_s
variables['trigger']['to_state'] = to_s
@callback
def call_action():
"""Call action with right context."""
hass.async_run_job(action, variables)
hass.async_run_job(action, variables)
if not time_delta:
call_action()
return
return async_track_state_change(hass, entity_id, state_automation_listener)
async_remove_track_same = async_track_same_state(
hass, True, time_delta, call_action, entity_ids=entity_id,
async_check_func=check_numeric_state)
unsub = async_track_state_change(
hass, entity_id, state_automation_listener)
@callback
def async_remove():
"""Remove state listeners async."""
unsub()
if async_remove_track_same:
async_remove_track_same() # pylint: disable=not-callable
return async_remove

View File

@ -8,28 +8,23 @@ import asyncio
import voluptuous as vol
from homeassistant.core import callback
import homeassistant.util.dt as dt_util
from homeassistant.const import MATCH_ALL, CONF_PLATFORM
from homeassistant.const import MATCH_ALL, CONF_PLATFORM, CONF_FOR
from homeassistant.helpers.event import (
async_track_state_change, async_track_point_in_utc_time)
async_track_state_change, async_track_same_state)
import homeassistant.helpers.config_validation as cv
CONF_ENTITY_ID = 'entity_id'
CONF_FROM = 'from'
CONF_TO = 'to'
CONF_FOR = 'for'
TRIGGER_SCHEMA = vol.All(
vol.Schema({
vol.Required(CONF_PLATFORM): 'state',
vol.Required(CONF_ENTITY_ID): cv.entity_ids,
# These are str on purpose. Want to catch YAML conversions
CONF_FROM: str,
CONF_TO: str,
CONF_FOR: vol.All(cv.time_period, cv.positive_timedelta),
}),
cv.key_dependency(CONF_FOR, CONF_TO),
)
TRIGGER_SCHEMA = vol.All(vol.Schema({
vol.Required(CONF_PLATFORM): 'state',
vol.Required(CONF_ENTITY_ID): cv.entity_ids,
# These are str on purpose. Want to catch YAML conversions
vol.Optional(CONF_FROM): str,
vol.Optional(CONF_TO): str,
vol.Optional(CONF_FOR): vol.All(cv.time_period, cv.positive_timedelta),
}), cv.key_dependency(CONF_FOR, CONF_TO))
@asyncio.coroutine
@ -39,28 +34,15 @@ def async_trigger(hass, config, action):
from_state = config.get(CONF_FROM, MATCH_ALL)
to_state = config.get(CONF_TO, MATCH_ALL)
time_delta = config.get(CONF_FOR)
async_remove_state_for_cancel = None
async_remove_state_for_listener = None
match_all = (from_state == MATCH_ALL and to_state == MATCH_ALL)
@callback
def clear_listener():
"""Clear all unsub listener."""
nonlocal async_remove_state_for_cancel, async_remove_state_for_listener
# pylint: disable=not-callable
if async_remove_state_for_listener is not None:
async_remove_state_for_listener()
async_remove_state_for_listener = None
if async_remove_state_for_cancel is not None:
async_remove_state_for_cancel()
async_remove_state_for_cancel = None
async_remove_track_same = None
@callback
def state_automation_listener(entity, from_s, to_s):
"""Listen for state changes and calls action."""
nonlocal async_remove_state_for_cancel, async_remove_state_for_listener
nonlocal async_remove_track_same
@callback
def call_action():
"""Call action with right context."""
hass.async_run_job(action, {
@ -78,33 +60,12 @@ def async_trigger(hass, config, action):
from_s.last_changed == to_s.last_changed):
return
if time_delta is None:
if not time_delta:
call_action()
return
@callback
def state_for_listener(now):
"""Fire on state changes after a delay and calls action."""
nonlocal async_remove_state_for_listener
async_remove_state_for_listener = None
clear_listener()
call_action()
@callback
def state_for_cancel_listener(entity, inner_from_s, inner_to_s):
"""Fire on changes and cancel for listener if changed."""
if inner_to_s.state == to_s.state:
return
clear_listener()
# cleanup previous listener
clear_listener()
async_remove_state_for_listener = async_track_point_in_utc_time(
hass, state_for_listener, dt_util.utcnow() + time_delta)
async_remove_state_for_cancel = async_track_state_change(
hass, entity, state_for_cancel_listener)
async_remove_track_same = async_track_same_state(
hass, to_s.state, time_delta, call_action, entity_ids=entity_id)
unsub = async_track_state_change(
hass, entity_id, state_automation_listener, from_state, to_state)
@ -113,6 +74,7 @@ def async_trigger(hass, config, action):
def async_remove():
"""Remove state listeners async."""
unsub()
clear_listener()
if async_remove_track_same:
async_remove_track_same() # pylint: disable=not-callable
return async_remove

View File

@ -19,16 +19,24 @@ from homeassistant.const import (
from homeassistant.exceptions import TemplateError
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import async_generate_entity_id
from homeassistant.helpers.event import async_track_state_change
from homeassistant.helpers.event import (
async_track_state_change, async_track_same_state)
from homeassistant.helpers.restore_state import async_get_last_state
_LOGGER = logging.getLogger(__name__)
CONF_DELAY_ON = 'delay_on'
CONF_DELAY_OFF = 'delay_off'
SENSOR_SCHEMA = vol.Schema({
vol.Required(CONF_VALUE_TEMPLATE): cv.template,
vol.Optional(ATTR_FRIENDLY_NAME): cv.string,
vol.Optional(ATTR_ENTITY_ID): cv.entity_ids,
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
vol.Optional(CONF_DELAY_ON):
vol.All(cv.time_period, cv.positive_timedelta),
vol.Optional(CONF_DELAY_OFF):
vol.All(cv.time_period, cv.positive_timedelta),
})
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
@ -47,6 +55,8 @@ def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
value_template.extract_entities())
friendly_name = device_config.get(ATTR_FRIENDLY_NAME, device)
device_class = device_config.get(CONF_DEVICE_CLASS)
delay_on = device_config.get(CONF_DELAY_ON)
delay_off = device_config.get(CONF_DELAY_OFF)
if value_template is not None:
value_template.hass = hass
@ -54,13 +64,13 @@ def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
sensors.append(
BinarySensorTemplate(
hass, device, friendly_name, device_class, value_template,
entity_ids)
entity_ids, delay_on, delay_off)
)
if not sensors:
_LOGGER.error("No sensors added")
return False
async_add_devices(sensors, True)
async_add_devices(sensors)
return True
@ -68,7 +78,7 @@ class BinarySensorTemplate(BinarySensorDevice):
"""A virtual binary sensor that triggers from another sensor."""
def __init__(self, hass, device, friendly_name, device_class,
value_template, entity_ids):
value_template, entity_ids, delay_on, delay_off):
"""Initialize the Template binary sensor."""
self.hass = hass
self.entity_id = async_generate_entity_id(
@ -78,6 +88,8 @@ class BinarySensorTemplate(BinarySensorDevice):
self._template = value_template
self._state = None
self._entities = entity_ids
self._delay_on = delay_on
self._delay_off = delay_off
@asyncio.coroutine
def async_added_to_hass(self):
@ -89,7 +101,7 @@ class BinarySensorTemplate(BinarySensorDevice):
@callback
def template_bsensor_state_listener(entity, old_state, new_state):
"""Handle the target device state changes."""
self.hass.async_add_job(self.async_update_ha_state(True))
self.async_check_state()
@callback
def template_bsensor_startup(event):
@ -97,7 +109,7 @@ class BinarySensorTemplate(BinarySensorDevice):
async_track_state_change(
self.hass, self._entities, template_bsensor_state_listener)
self.hass.async_add_job(self.async_update_ha_state(True))
self.hass.async_add_job(self.async_check_state)
self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_START, template_bsensor_startup)
@ -122,11 +134,11 @@ class BinarySensorTemplate(BinarySensorDevice):
"""No polling needed."""
return False
@asyncio.coroutine
def async_update(self):
"""Update the state from the template."""
@callback
def _async_render(self, *args):
"""Get the state of template."""
try:
self._state = self._template.async_render().lower() == 'true'
return self._template.async_render().lower() == 'true'
except TemplateError as ex:
if ex.args and ex.args[0].startswith(
"UndefinedError: 'None' has no attribute"):
@ -135,4 +147,29 @@ class BinarySensorTemplate(BinarySensorDevice):
"the state is unknown", self._name)
return
_LOGGER.error("Could not render template %s: %s", self._name, ex)
self._state = False
@callback
def async_check_state(self):
"""Update the state from the template."""
state = self._async_render()
# return if the state don't change or is invalid
if state is None or state == self.state:
return
@callback
def set_state():
"""Set state of template binary sensor."""
self._state = state
self.hass.async_add_job(self.async_update_ha_state())
# state without delay
if (state and not self._delay_on) or \
(not state and not self._delay_off):
set_state()
return
period = self._delay_on if state else self._delay_off
async_track_same_state(
self.hass, state, period, set_state, entity_ids=self._entities,
async_check_func=self._async_render)

View File

@ -101,6 +101,7 @@ CONF_EVENT = 'event'
CONF_EXCLUDE = 'exclude'
CONF_FILE_PATH = 'file_path'
CONF_FILENAME = 'filename'
CONF_FOR = 'for'
CONF_FRIENDLY_NAME = 'friendly_name'
CONF_HEADERS = 'headers'
CONF_HOST = 'host'

View File

@ -113,6 +113,62 @@ def async_track_template(hass, template, action, variables=None):
track_template = threaded_listener_factory(async_track_template)
@callback
def async_track_same_state(hass, orig_value, period, action,
async_check_func=None, entity_ids=MATCH_ALL):
"""Track the state of entities for a period and run a action.
If async_check_func is None it use the state of orig_value.
Without entity_ids we track all state changes.
"""
async_remove_state_for_cancel = None
async_remove_state_for_listener = None
@callback
def clear_listener():
"""Clear all unsub listener."""
nonlocal async_remove_state_for_cancel, async_remove_state_for_listener
# pylint: disable=not-callable
if async_remove_state_for_listener is not None:
async_remove_state_for_listener()
async_remove_state_for_listener = None
if async_remove_state_for_cancel is not None:
async_remove_state_for_cancel()
async_remove_state_for_cancel = None
@callback
def state_for_listener(now):
"""Fire on state changes after a delay and calls action."""
nonlocal async_remove_state_for_listener
async_remove_state_for_listener = None
clear_listener()
hass.async_run_job(action)
@callback
def state_for_cancel_listener(entity, from_state, to_state):
"""Fire on changes and cancel for listener if changed."""
if async_check_func:
value = async_check_func(entity, from_state, to_state)
else:
value = to_state.state
if orig_value == value:
return
clear_listener()
async_remove_state_for_listener = async_track_point_in_utc_time(
hass, state_for_listener, dt_util.utcnow() + period)
async_remove_state_for_cancel = async_track_state_change(
hass, entity_ids, state_for_cancel_listener)
return clear_listener
track_same_state = threaded_listener_factory(async_track_same_state)
@callback
def async_track_point_in_time(hass, action, point_in_time):
"""Add a listener that fires once after a specific point in time."""

View File

@ -1,11 +1,16 @@
"""The tests for numeric state automation."""
from datetime import timedelta
import unittest
from unittest.mock import patch
import homeassistant.components.automation as automation
from homeassistant.core import callback
from homeassistant.setup import setup_component
import homeassistant.components.automation as automation
import homeassistant.util.dt as dt_util
from tests.common import get_test_home_assistant, mock_component
from tests.common import (
get_test_home_assistant, mock_component, fire_time_changed,
assert_setup_component)
# pylint: disable=invalid-name
@ -576,3 +581,126 @@ class TestAutomationNumericState(unittest.TestCase):
self.hass.block_till_done()
self.assertEqual(2, len(self.calls))
def test_if_fails_setup_bad_for(self):
"""Test for setup failure for bad for."""
with assert_setup_component(0):
assert setup_component(self.hass, automation.DOMAIN, {
automation.DOMAIN: {
'trigger': {
'platform': 'numeric_state',
'entity_id': 'test.entity',
'above': 8,
'below': 12,
'for': {
'invalid': 5
},
},
'action': {
'service': 'homeassistant.turn_on',
}
}})
def test_if_fails_setup_for_without_above_below(self):
"""Test for setup failures for missing above or below."""
with assert_setup_component(0):
assert setup_component(self.hass, automation.DOMAIN, {
automation.DOMAIN: {
'trigger': {
'platform': 'numeric_state',
'entity_id': 'test.entity',
'for': {
'seconds': 5
},
},
'action': {
'service': 'homeassistant.turn_on',
}
}})
def test_if_not_fires_on_entity_change_with_for(self):
"""Test for not firing on entity change with for."""
assert setup_component(self.hass, automation.DOMAIN, {
automation.DOMAIN: {
'trigger': {
'platform': 'numeric_state',
'entity_id': 'test.entity',
'above': 8,
'below': 12,
'for': {
'seconds': 5
},
},
'action': {
'service': 'test.automation'
}
}
})
self.hass.states.set('test.entity', 9)
self.hass.block_till_done()
self.hass.states.set('test.entity', 15)
self.hass.block_till_done()
fire_time_changed(self.hass, dt_util.utcnow() + timedelta(seconds=10))
self.hass.block_till_done()
self.assertEqual(0, len(self.calls))
def test_if_fires_on_entity_change_with_for_attribute_change(self):
"""Test for firing on entity change with for and attribute change."""
assert setup_component(self.hass, automation.DOMAIN, {
automation.DOMAIN: {
'trigger': {
'platform': 'numeric_state',
'entity_id': 'test.entity',
'above': 8,
'below': 12,
'for': {
'seconds': 5
},
},
'action': {
'service': 'test.automation'
}
}
})
utcnow = dt_util.utcnow()
with patch('homeassistant.core.dt_util.utcnow') as mock_utcnow:
mock_utcnow.return_value = utcnow
self.hass.states.set('test.entity', 9)
self.hass.block_till_done()
mock_utcnow.return_value += timedelta(seconds=4)
fire_time_changed(self.hass, mock_utcnow.return_value)
self.hass.states.set('test.entity', 9,
attributes={"mock_attr": "attr_change"})
self.hass.block_till_done()
self.assertEqual(0, len(self.calls))
mock_utcnow.return_value += timedelta(seconds=4)
fire_time_changed(self.hass, mock_utcnow.return_value)
self.hass.block_till_done()
self.assertEqual(1, len(self.calls))
def test_if_fires_on_entity_change_with_for(self):
"""Test for firing on entity change with for."""
assert setup_component(self.hass, automation.DOMAIN, {
automation.DOMAIN: {
'trigger': {
'platform': 'numeric_state',
'entity_id': 'test.entity',
'above': 8,
'below': 12,
'for': {
'seconds': 5
},
},
'action': {
'service': 'test.automation'
}
}
})
self.hass.states.set('test.entity', 9)
self.hass.block_till_done()
fire_time_changed(self.hass, dt_util.utcnow() + timedelta(seconds=10))
self.hass.block_till_done()
self.assertEqual(1, len(self.calls))

View File

@ -1,5 +1,6 @@
"""The tests for the Template Binary sensor platform."""
import asyncio
from datetime import timedelta
import unittest
from unittest import mock
@ -10,10 +11,12 @@ from homeassistant.components.binary_sensor import template
from homeassistant.exceptions import TemplateError
from homeassistant.helpers import template as template_hlpr
from homeassistant.util.async import run_callback_threadsafe
import homeassistant.util.dt as dt_util
from homeassistant.helpers.restore_state import DATA_RESTORE_CACHE
from tests.common import (
get_test_home_assistant, assert_setup_component, mock_component)
get_test_home_assistant, assert_setup_component, mock_component,
async_fire_time_changed)
class TestBinarySensorTemplate(unittest.TestCase):
@ -103,19 +106,20 @@ class TestBinarySensorTemplate(unittest.TestCase):
vs = run_callback_threadsafe(
self.hass.loop, template.BinarySensorTemplate,
self.hass, 'parent', 'Parent', 'motion',
template_hlpr.Template('{{ 1 > 1 }}', self.hass), MATCH_ALL
template_hlpr.Template('{{ 1 > 1 }}', self.hass), MATCH_ALL,
None, None
).result()
self.assertFalse(vs.should_poll)
self.assertEqual('motion', vs.device_class)
self.assertEqual('Parent', vs.name)
vs.update()
run_callback_threadsafe(self.hass.loop, vs.async_check_state).result()
self.assertFalse(vs.is_on)
# pylint: disable=protected-access
vs._template = template_hlpr.Template("{{ 2 > 1 }}", self.hass)
vs.update()
run_callback_threadsafe(self.hass.loop, vs.async_check_state).result()
self.assertTrue(vs.is_on)
def test_event(self):
@ -155,13 +159,14 @@ class TestBinarySensorTemplate(unittest.TestCase):
vs = run_callback_threadsafe(
self.hass.loop, template.BinarySensorTemplate,
self.hass, 'parent', 'Parent', 'motion',
template_hlpr.Template('{{ 1 > 1 }}', self.hass), MATCH_ALL
template_hlpr.Template('{{ 1 > 1 }}', self.hass), MATCH_ALL,
None, None
).result()
mock_render.side_effect = TemplateError('foo')
vs.update()
run_callback_threadsafe(self.hass.loop, vs.async_check_state).result()
mock_render.side_effect = TemplateError(
"UndefinedError: 'None' has no attribute")
vs.update()
run_callback_threadsafe(self.hass.loop, vs.async_check_state).result()
@asyncio.coroutine
@ -197,3 +202,124 @@ def test_restore_state(hass):
state = hass.states.get('binary_sensor.test')
assert state.state == 'off'
@asyncio.coroutine
def test_template_delay_on(hass):
"""Test binary sensor template delay on."""
config = {
'binary_sensor': {
'platform': 'template',
'sensors': {
'test': {
'friendly_name': 'virtual thingy',
'value_template':
"{{ states.sensor.test_state.state == 'on' }}",
'device_class': 'motion',
'delay_on': 5
},
},
},
}
yield from setup.async_setup_component(hass, 'binary_sensor', config)
yield from hass.async_start()
hass.states.async_set('sensor.test_state', 'on')
yield from hass.async_block_till_done()
state = hass.states.get('binary_sensor.test')
assert state.state == 'off'
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
yield from hass.async_block_till_done()
state = hass.states.get('binary_sensor.test')
assert state.state == 'on'
# check with time changes
hass.states.async_set('sensor.test_state', 'off')
yield from hass.async_block_till_done()
state = hass.states.get('binary_sensor.test')
assert state.state == 'off'
hass.states.async_set('sensor.test_state', 'on')
yield from hass.async_block_till_done()
state = hass.states.get('binary_sensor.test')
assert state.state == 'off'
hass.states.async_set('sensor.test_state', 'off')
yield from hass.async_block_till_done()
state = hass.states.get('binary_sensor.test')
assert state.state == 'off'
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
yield from hass.async_block_till_done()
state = hass.states.get('binary_sensor.test')
assert state.state == 'off'
@asyncio.coroutine
def test_template_delay_off(hass):
"""Test binary sensor template delay off."""
config = {
'binary_sensor': {
'platform': 'template',
'sensors': {
'test': {
'friendly_name': 'virtual thingy',
'value_template':
"{{ states.sensor.test_state.state == 'on' }}",
'device_class': 'motion',
'delay_off': 5
},
},
},
}
hass.states.async_set('sensor.test_state', 'on')
yield from setup.async_setup_component(hass, 'binary_sensor', config)
yield from hass.async_start()
hass.states.async_set('sensor.test_state', 'off')
yield from hass.async_block_till_done()
state = hass.states.get('binary_sensor.test')
assert state.state == 'on'
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
yield from hass.async_block_till_done()
state = hass.states.get('binary_sensor.test')
assert state.state == 'off'
# check with time changes
hass.states.async_set('sensor.test_state', 'on')
yield from hass.async_block_till_done()
state = hass.states.get('binary_sensor.test')
assert state.state == 'on'
hass.states.async_set('sensor.test_state', 'off')
yield from hass.async_block_till_done()
state = hass.states.get('binary_sensor.test')
assert state.state == 'on'
hass.states.async_set('sensor.test_state', 'on')
yield from hass.async_block_till_done()
state = hass.states.get('binary_sensor.test')
assert state.state == 'on'
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
yield from hass.async_block_till_done()
state = hass.states.get('binary_sensor.test')
assert state.state == 'on'

View File

@ -17,6 +17,7 @@ from homeassistant.helpers.event import (
track_state_change,
track_time_interval,
track_template,
track_same_state,
track_sunrise,
track_sunset,
)
@ -24,7 +25,7 @@ from homeassistant.helpers.template import Template
from homeassistant.components import sun
import homeassistant.util.dt as dt_util
from tests.common import get_test_home_assistant
from tests.common import get_test_home_assistant, fire_time_changed
from unittest.mock import patch
@ -262,6 +263,111 @@ class TestEventHelpers(unittest.TestCase):
self.assertEqual(2, len(wildcard_runs))
self.assertEqual(2, len(wildercard_runs))
def test_track_same_state_simple_trigger(self):
"""Test track_same_change with trigger simple."""
thread_runs = []
callback_runs = []
coroutine_runs = []
period = timedelta(minutes=1)
def thread_run_callback():
thread_runs.append(1)
track_same_state(
self.hass, 'on', period, thread_run_callback,
entity_ids='light.Bowl')
@ha.callback
def callback_run_callback():
callback_runs.append(1)
track_same_state(
self.hass, 'on', period, callback_run_callback,
entity_ids='light.Bowl')
@asyncio.coroutine
def coroutine_run_callback():
coroutine_runs.append(1)
track_same_state(
self.hass, 'on', period, coroutine_run_callback)
# Adding state to state machine
self.hass.states.set("light.Bowl", "on")
self.hass.block_till_done()
self.assertEqual(0, len(thread_runs))
self.assertEqual(0, len(callback_runs))
self.assertEqual(0, len(coroutine_runs))
# change time to track and see if they trigger
future = dt_util.utcnow() + period
fire_time_changed(self.hass, future)
self.hass.block_till_done()
self.assertEqual(1, len(thread_runs))
self.assertEqual(1, len(callback_runs))
self.assertEqual(1, len(coroutine_runs))
def test_track_same_state_simple_no_trigger(self):
"""Test track_same_change with no trigger."""
callback_runs = []
period = timedelta(minutes=1)
@ha.callback
def callback_run_callback():
callback_runs.append(1)
track_same_state(
self.hass, 'on', period, callback_run_callback,
entity_ids='light.Bowl')
# Adding state to state machine
self.hass.states.set("light.Bowl", "on")
self.hass.block_till_done()
self.assertEqual(0, len(callback_runs))
# Change state on state machine
self.hass.states.set("light.Bowl", "off")
self.hass.block_till_done()
self.assertEqual(0, len(callback_runs))
# change time to track and see if they trigger
future = dt_util.utcnow() + period
fire_time_changed(self.hass, future)
self.hass.block_till_done()
self.assertEqual(0, len(callback_runs))
def test_track_same_state_simple_trigger_check_funct(self):
"""Test track_same_change with trigger and check funct."""
callback_runs = []
check_func = []
period = timedelta(minutes=1)
@ha.callback
def callback_run_callback():
callback_runs.append(1)
@ha.callback
def async_check_func(entity, from_s, to_s):
check_func.append((entity, from_s, to_s))
return 'on'
track_same_state(
self.hass, 'on', period, callback_run_callback,
entity_ids='light.Bowl', async_check_func=async_check_func)
# Adding state to state machine
self.hass.states.set("light.Bowl", "on")
self.hass.block_till_done()
self.assertEqual(0, len(callback_runs))
self.assertEqual('on', check_func[-1][2].state)
self.assertEqual('light.bowl', check_func[-1][0])
# change time to track and see if they trigger
future = dt_util.utcnow() + period
fire_time_changed(self.hass, future)
self.hass.block_till_done()
self.assertEqual(1, len(callback_runs))
def test_track_time_interval(self):
"""Test tracking time interval."""
specific_runs = []