Merge pull request #1353 from balloob/assume-state

Add assumed_state to Group, MQTT Switch, MQTT Light
This commit is contained in:
Paulus Schoutsen 2016-02-21 09:19:54 -08:00
commit 070cee48a8
6 changed files with 160 additions and 55 deletions

View File

@ -9,7 +9,8 @@ https://home-assistant.io/components/group/
import homeassistant.core as ha import homeassistant.core as ha
from homeassistant.const import ( from homeassistant.const import (
ATTR_ENTITY_ID, CONF_ICON, CONF_NAME, STATE_CLOSED, STATE_HOME, ATTR_ENTITY_ID, CONF_ICON, CONF_NAME, STATE_CLOSED, STATE_HOME,
STATE_NOT_HOME, STATE_OFF, STATE_ON, STATE_OPEN, STATE_UNKNOWN) STATE_NOT_HOME, STATE_OFF, STATE_ON, STATE_OPEN, STATE_UNKNOWN,
ATTR_ASSUMED_STATE, )
from homeassistant.helpers.entity import ( from homeassistant.helpers.entity import (
Entity, generate_entity_id, split_entity_id) Entity, generate_entity_id, split_entity_id)
from homeassistant.helpers.event import track_state_change from homeassistant.helpers.event import track_state_change
@ -144,6 +145,7 @@ class Group(Entity):
self.tracking = [] self.tracking = []
self.group_on = None self.group_on = None
self.group_off = None self.group_off = None
self._assumed_state = False
if entity_ids is not None: if entity_ids is not None:
self.update_tracked_entity_ids(entity_ids) self.update_tracked_entity_ids(entity_ids)
@ -182,6 +184,11 @@ class Group(Entity):
data[ATTR_VIEW] = True data[ATTR_VIEW] = True
return data return data
@property
def assumed_state(self):
"""Return True if unable to access real state of entity."""
return self._assumed_state
def update_tracked_entity_ids(self, entity_ids): def update_tracked_entity_ids(self, entity_ids):
""" Update the tracked entity IDs. """ """ Update the tracked entity IDs. """
self.stop() self.stop()
@ -207,47 +214,77 @@ class Group(Entity):
def update(self): def update(self):
""" Query all the tracked states and determine current group state. """ """ Query all the tracked states and determine current group state. """
self._state = STATE_UNKNOWN self._state = STATE_UNKNOWN
self._update_group_state()
def _state_changed_listener(self, entity_id, old_state, new_state):
""" Listener to receive state changes of tracked entities. """
self._update_group_state(new_state)
self.update_ha_state()
@property
def _tracking_states(self):
"""States that the group is tracking."""
states = []
for entity_id in self.tracking: for entity_id in self.tracking:
state = self.hass.states.get(entity_id) state = self.hass.states.get(entity_id)
if state is not None: if state is not None:
self._process_tracked_state(state) states.append(state)
def _state_changed_listener(self, entity_id, old_state, new_state): return states
""" Listener to receive state changes of tracked entities. """
self._process_tracked_state(new_state)
self.update_ha_state()
def _process_tracked_state(self, tr_state): def _update_group_state(self, tr_state=None):
""" Updates group state based on a new state of a tracked entity. """ """Update group state.
Optionally you can provide the only state changed since last update
allowing this method to take shortcuts.
"""
# pylint: disable=too-many-branches
# To store current states of group entities. Might not be needed.
states = None
gr_state, gr_on, gr_off = self._state, self.group_on, self.group_off
# We have not determined type of group yet # We have not determined type of group yet
if self.group_on is None: if gr_on is None:
self.group_on, self.group_off = _get_group_on_off(tr_state.state) if tr_state is None:
states = self._tracking_states
if self.group_on is not None: for state in states:
# New state of the group is going to be based on the first gr_on, gr_off = \
# state that we can recognize _get_group_on_off(state.state)
self._state = tr_state.state if gr_on is not None:
break
else:
gr_on, gr_off = _get_group_on_off(tr_state.state)
if gr_on is not None:
self.group_on, self.group_off = gr_on, gr_off
# We cannot determine state of the group
if gr_on is None:
return return
# There is already a group state if tr_state is None or (gr_state == gr_on and
cur_gr_state = self._state tr_state.state == gr_off):
group_on, group_off = self.group_on, self.group_off if states is None:
states = self._tracking_states
# if cur_gr_state = OFF and tr_state = ON: set ON if any(state.state == gr_on for state in states):
# if cur_gr_state = ON and tr_state = OFF: research self._state = gr_on
# else: ignore else:
self._state = gr_off
if cur_gr_state == group_off and tr_state.state == group_on: elif tr_state.state in (gr_on, gr_off):
self._state = group_on self._state = tr_state.state
elif cur_gr_state == group_on and tr_state.state == group_off: if tr_state is None or self._assumed_state and \
not tr_state.attributes.get(ATTR_ASSUMED_STATE):
if states is None:
states = self._tracking_states
# Set to off if no other states are on self._assumed_state = any(state.attributes.get(ATTR_ASSUMED_STATE)
if not any(self.hass.states.is_state(ent_id, group_on) for state in states)
for ent_id in self.tracking
if tr_state.entity_id != ent_id): elif tr_state.attributes.get(ATTR_ASSUMED_STATE):
self._state = group_off self._assumed_state = True

View File

@ -134,6 +134,11 @@ class MqttLight(Light):
""" True if device is on. """ """ True if device is on. """
return self._state return self._state
@property
def assumed_state(self):
"""Return True if we do optimistic updates."""
return self._optimistic
def turn_on(self, **kwargs): def turn_on(self, **kwargs):
""" Turn the device on. """ """ Turn the device on. """
should_update = False should_update = False

View File

@ -97,6 +97,11 @@ class MqttSwitch(SwitchDevice):
""" True if device is on. """ """ True if device is on. """
return self._state return self._state
@property
def assumed_state(self):
"""Return True if we do optimistic updates."""
return self._optimistic
def turn_on(self, **kwargs): def turn_on(self, **kwargs):
""" Turn the device on. """ """ Turn the device on. """
mqtt.publish(self.hass, self._command_topic, self._payload_on, mqtt.publish(self.hass, self._command_topic, self._payload_on,

View File

@ -45,7 +45,7 @@ light:
""" """
import unittest import unittest
from homeassistant.const import STATE_ON, STATE_OFF from homeassistant.const import STATE_ON, STATE_OFF, ATTR_ASSUMED_STATE
import homeassistant.components.light as light import homeassistant.components.light as light
from tests.common import ( from tests.common import (
get_test_home_assistant, mock_mqtt_component, fire_mqtt_message) get_test_home_assistant, mock_mqtt_component, fire_mqtt_message)
@ -115,6 +115,7 @@ class TestLightMQTT(unittest.TestCase):
self.assertEqual(STATE_OFF, state.state) self.assertEqual(STATE_OFF, state.state)
self.assertIsNone(state.attributes.get('rgb_color')) self.assertIsNone(state.attributes.get('rgb_color'))
self.assertIsNone(state.attributes.get('brightness')) self.assertIsNone(state.attributes.get('brightness'))
self.assertIsNone(state.attributes.get(ATTR_ASSUMED_STATE))
fire_mqtt_message(self.hass, 'test_light_rgb/status', 'on') fire_mqtt_message(self.hass, 'test_light_rgb/status', 'on')
self.hass.pool.block_till_done() self.hass.pool.block_till_done()
@ -201,6 +202,7 @@ class TestLightMQTT(unittest.TestCase):
state = self.hass.states.get('light.test') state = self.hass.states.get('light.test')
self.assertEqual(STATE_OFF, state.state) self.assertEqual(STATE_OFF, state.state)
self.assertTrue(state.attributes.get(ATTR_ASSUMED_STATE))
light.turn_on(self.hass, 'light.test') light.turn_on(self.hass, 'light.test')
self.hass.pool.block_till_done() self.hass.pool.block_till_done()

View File

@ -6,7 +6,7 @@ Tests MQTT switch.
""" """
import unittest import unittest
from homeassistant.const import STATE_ON, STATE_OFF from homeassistant.const import STATE_ON, STATE_OFF, ATTR_ASSUMED_STATE
import homeassistant.components.switch as switch import homeassistant.components.switch as switch
from tests.common import ( from tests.common import (
mock_mqtt_component, fire_mqtt_message, get_test_home_assistant) mock_mqtt_component, fire_mqtt_message, get_test_home_assistant)
@ -37,6 +37,7 @@ class TestSensorMQTT(unittest.TestCase):
state = self.hass.states.get('switch.test') state = self.hass.states.get('switch.test')
self.assertEqual(STATE_OFF, state.state) self.assertEqual(STATE_OFF, state.state)
self.assertIsNone(state.attributes.get(ATTR_ASSUMED_STATE))
fire_mqtt_message(self.hass, 'state-topic', 'beer on') fire_mqtt_message(self.hass, 'state-topic', 'beer on')
self.hass.pool.block_till_done() self.hass.pool.block_till_done()
@ -64,6 +65,7 @@ class TestSensorMQTT(unittest.TestCase):
state = self.hass.states.get('switch.test') state = self.hass.states.get('switch.test')
self.assertEqual(STATE_OFF, state.state) self.assertEqual(STATE_OFF, state.state)
self.assertTrue(state.attributes.get(ATTR_ASSUMED_STATE))
switch.turn_on(self.hass, 'switch.test') switch.turn_on(self.hass, 'switch.test')
self.hass.pool.block_till_done() self.hass.pool.block_till_done()

View File

@ -8,7 +8,8 @@ Tests the group compoments.
import unittest import unittest
from homeassistant.const import ( from homeassistant.const import (
STATE_ON, STATE_OFF, STATE_HOME, STATE_UNKNOWN, ATTR_ICON, ATTR_HIDDEN) STATE_ON, STATE_OFF, STATE_HOME, STATE_UNKNOWN, ATTR_ICON, ATTR_HIDDEN,
ATTR_ASSUMED_STATE, )
import homeassistant.components.group as group import homeassistant.components.group as group
from tests.common import get_test_home_assistant from tests.common import get_test_home_assistant
@ -21,19 +22,13 @@ class TestComponentsGroup(unittest.TestCase):
""" Init needed objects. """ """ Init needed objects. """
self.hass = get_test_home_assistant() self.hass = get_test_home_assistant()
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
self.group_entity_id = test_group.entity_id
def tearDown(self): # pylint: disable=invalid-name def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """ """ Stop down stuff we started. """
self.hass.stop() self.hass.stop()
def test_setup_group_with_mixed_groupable_states(self): def test_setup_group_with_mixed_groupable_states(self):
""" Try to setup a group with mixed groupable states """ """ Try to setup a group with mixed groupable states """
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('device_tracker.Paulus', STATE_HOME) self.hass.states.set('device_tracker.Paulus', STATE_HOME)
group.Group( group.Group(
self.hass, 'person_and_light', self.hass, 'person_and_light',
@ -46,6 +41,8 @@ class TestComponentsGroup(unittest.TestCase):
def test_setup_group_with_a_non_existing_state(self): def test_setup_group_with_a_non_existing_state(self):
""" Try to setup a group with a non existing state """ """ Try to setup a group with a non existing state """
self.hass.states.set('light.Bowl', STATE_ON)
grp = group.Group( grp = group.Group(
self.hass, 'light_and_nothing', self.hass, 'light_and_nothing',
['light.Bowl', 'non.existing']) ['light.Bowl', 'non.existing'])
@ -70,11 +67,15 @@ class TestComponentsGroup(unittest.TestCase):
def test_monitor_group(self): def test_monitor_group(self):
""" Test if the group keeps track of states. """ """ Test if the group keeps track of states. """
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
# Test if group setup in our init mode is ok # Test if group setup in our init mode is ok
self.assertIn(self.group_entity_id, self.hass.states.entity_ids()) self.assertIn(test_group.entity_id, self.hass.states.entity_ids())
group_state = self.hass.states.get(self.group_entity_id) group_state = self.hass.states.get(test_group.entity_id)
self.assertEqual(STATE_ON, group_state.state) self.assertEqual(STATE_ON, group_state.state)
self.assertTrue(group_state.attributes.get(group.ATTR_AUTO)) self.assertTrue(group_state.attributes.get(group.ATTR_AUTO))
@ -83,54 +84,73 @@ class TestComponentsGroup(unittest.TestCase):
Test if the group turns off if the last device that was on turns off. Test if the group turns off if the last device that was on turns off.
""" """
self.hass.states.set('light.Bowl', STATE_OFF) self.hass.states.set('light.Bowl', STATE_OFF)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
self.hass.pool.block_till_done() self.hass.pool.block_till_done()
group_state = self.hass.states.get(self.group_entity_id) group_state = self.hass.states.get(test_group.entity_id)
self.assertEqual(STATE_OFF, group_state.state) self.assertEqual(STATE_OFF, group_state.state)
def test_group_turns_on_if_all_are_off_and_one_turns_on(self): def test_group_turns_on_if_all_are_off_and_one_turns_on(self):
""" """
Test if group turns on if all devices were turned off and one turns on. Test if group turns on if all devices were turned off and one turns on.
""" """
# Make sure all are off.
self.hass.states.set('light.Bowl', STATE_OFF) self.hass.states.set('light.Bowl', STATE_OFF)
self.hass.pool.block_till_done() self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
# Turn one on # Turn one on
self.hass.states.set('light.Ceiling', STATE_ON) self.hass.states.set('light.Ceiling', STATE_ON)
self.hass.pool.block_till_done() self.hass.pool.block_till_done()
group_state = self.hass.states.get(self.group_entity_id) group_state = self.hass.states.get(test_group.entity_id)
self.assertEqual(STATE_ON, group_state.state) self.assertEqual(STATE_ON, group_state.state)
def test_is_on(self): def test_is_on(self):
""" Test is_on method. """ """ Test is_on method. """
self.assertTrue(group.is_on(self.hass, self.group_entity_id)) self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
self.assertTrue(group.is_on(self.hass, test_group.entity_id))
self.hass.states.set('light.Bowl', STATE_OFF) self.hass.states.set('light.Bowl', STATE_OFF)
self.hass.pool.block_till_done() self.hass.pool.block_till_done()
self.assertFalse(group.is_on(self.hass, self.group_entity_id)) self.assertFalse(group.is_on(self.hass, test_group.entity_id))
# Try on non existing state # Try on non existing state
self.assertFalse(group.is_on(self.hass, 'non.existing')) self.assertFalse(group.is_on(self.hass, 'non.existing'))
def test_expand_entity_ids(self): def test_expand_entity_ids(self):
""" Test expand_entity_ids method. """ """ Test expand_entity_ids method. """
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
self.assertEqual(sorted(['light.ceiling', 'light.bowl']), self.assertEqual(sorted(['light.ceiling', 'light.bowl']),
sorted(group.expand_entity_ids( sorted(group.expand_entity_ids(
self.hass, [self.group_entity_id]))) self.hass, [test_group.entity_id])))
def test_expand_entity_ids_does_not_return_duplicates(self): def test_expand_entity_ids_does_not_return_duplicates(self):
""" Test that expand_entity_ids does not return duplicates. """ """ Test that expand_entity_ids does not return duplicates. """
self.assertEqual( self.hass.states.set('light.Bowl', STATE_ON)
['light.bowl', 'light.ceiling'], self.hass.states.set('light.Ceiling', STATE_OFF)
sorted(group.expand_entity_ids( test_group = group.Group(
self.hass, [self.group_entity_id, 'light.Ceiling']))) self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
self.assertEqual( self.assertEqual(
['light.bowl', 'light.ceiling'], ['light.bowl', 'light.ceiling'],
sorted(group.expand_entity_ids( sorted(group.expand_entity_ids(
self.hass, ['light.bowl', self.group_entity_id]))) self.hass, [test_group.entity_id, 'light.Ceiling'])))
self.assertEqual(
['light.bowl', 'light.ceiling'],
sorted(group.expand_entity_ids(
self.hass, ['light.bowl', test_group.entity_id])))
def test_expand_entity_ids_ignores_non_strings(self): def test_expand_entity_ids_ignores_non_strings(self):
""" Test that non string elements in lists are ignored. """ """ Test that non string elements in lists are ignored. """
@ -138,9 +158,14 @@ class TestComponentsGroup(unittest.TestCase):
def test_get_entity_ids(self): def test_get_entity_ids(self):
""" Test get_entity_ids method. """ """ Test get_entity_ids method. """
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
self.assertEqual( self.assertEqual(
['light.bowl', 'light.ceiling'], ['light.bowl', 'light.ceiling'],
sorted(group.get_entity_ids(self.hass, self.group_entity_id))) sorted(group.get_entity_ids(self.hass, test_group.entity_id)))
def test_get_entity_ids_with_domain_filter(self): def test_get_entity_ids_with_domain_filter(self):
""" Test if get_entity_ids works with a domain_filter. """ """ Test if get_entity_ids works with a domain_filter. """
@ -190,13 +215,18 @@ class TestComponentsGroup(unittest.TestCase):
def test_setup(self): def test_setup(self):
""" Test setup method. """ """ Test setup method. """
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group', ['light.Bowl', 'light.Ceiling'], False)
self.assertTrue( self.assertTrue(
group.setup( group.setup(
self.hass, self.hass,
{ {
group.DOMAIN: { group.DOMAIN: {
'second_group': { 'second_group': {
'entities': 'light.Bowl, ' + self.group_entity_id, 'entities': 'light.Bowl, ' + test_group.entity_id,
'icon': 'mdi:work', 'icon': 'mdi:work',
'view': True, 'view': True,
}, },
@ -207,7 +237,7 @@ class TestComponentsGroup(unittest.TestCase):
group_state = self.hass.states.get( group_state = self.hass.states.get(
group.ENTITY_ID_FORMAT.format('second_group')) group.ENTITY_ID_FORMAT.format('second_group'))
self.assertEqual(STATE_ON, group_state.state) self.assertEqual(STATE_ON, group_state.state)
self.assertEqual(set((self.group_entity_id, 'light.bowl')), self.assertEqual(set((test_group.entity_id, 'light.bowl')),
set(group_state.attributes['entity_id'])) set(group_state.attributes['entity_id']))
self.assertIsNone(group_state.attributes.get(group.ATTR_AUTO)) self.assertIsNone(group_state.attributes.get(group.ATTR_AUTO))
self.assertEqual('mdi:work', self.assertEqual('mdi:work',
@ -242,3 +272,27 @@ class TestComponentsGroup(unittest.TestCase):
['light.test_1', 'light.test_2', 'switch.test_1', 'switch.test_2'], ['light.test_1', 'light.test_2', 'switch.test_1', 'switch.test_2'],
sorted(group.expand_entity_ids(self.hass, sorted(group.expand_entity_ids(self.hass,
['group.group_of_groups']))) ['group.group_of_groups'])))
def test_set_assumed_state_based_on_tracked(self):
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.states.set('light.Ceiling', STATE_OFF)
test_group = group.Group(
self.hass, 'init_group',
['light.Bowl', 'light.Ceiling', 'sensor.no_exist'])
state = self.hass.states.get(test_group.entity_id)
self.assertIsNone(state.attributes.get(ATTR_ASSUMED_STATE))
self.hass.states.set('light.Bowl', STATE_ON, {
ATTR_ASSUMED_STATE: True
})
self.hass.pool.block_till_done()
state = self.hass.states.get(test_group.entity_id)
self.assertTrue(state.attributes.get(ATTR_ASSUMED_STATE))
self.hass.states.set('light.Bowl', STATE_ON)
self.hass.pool.block_till_done()
state = self.hass.states.get(test_group.entity_id)
self.assertIsNone(state.attributes.get(ATTR_ASSUMED_STATE))