Add tests for zwave network events (#7573)

This commit is contained in:
Adam Mills 2017-05-12 23:06:32 -07:00 committed by Paulus Schoutsen
parent 9c4bc2a47f
commit cfea4b17e3

View File

@ -320,6 +320,123 @@ def test_value_discovery_existing_entity(hass, mock_openzwave):
'current_temperature'] == 23.5
@asyncio.coroutine
def test_scene_activated(hass, mock_openzwave):
"""Test scene activated event."""
mock_receivers = []
def mock_connect(receiver, signal, *args, **kwargs):
if signal == MockNetwork.SIGNAL_SCENE_EVENT:
mock_receivers.append(receiver)
with patch('pydispatch.dispatcher.connect', new=mock_connect):
yield from async_setup_component(hass, 'zwave', {'zwave': {}})
assert len(mock_receivers) == 1
events = []
def listener(event):
events.append(event)
hass.bus.async_listen(const.EVENT_SCENE_ACTIVATED, listener)
node = MockNode(node_id=11)
scene_id = 123
hass.async_add_job(mock_receivers[0], node, scene_id)
yield from hass.async_block_till_done()
assert len(events) == 1
assert events[0].data[ATTR_ENTITY_ID] == "mock_node_11"
assert events[0].data[const.ATTR_OBJECT_ID] == "mock_node_11"
assert events[0].data[const.ATTR_SCENE_ID] == scene_id
@asyncio.coroutine
def test_node_event_activated(hass, mock_openzwave):
"""Test Node event activated event."""
mock_receivers = []
def mock_connect(receiver, signal, *args, **kwargs):
if signal == MockNetwork.SIGNAL_NODE_EVENT:
mock_receivers.append(receiver)
with patch('pydispatch.dispatcher.connect', new=mock_connect):
yield from async_setup_component(hass, 'zwave', {'zwave': {}})
assert len(mock_receivers) == 1
events = []
def listener(event):
events.append(event)
hass.bus.async_listen(const.EVENT_NODE_EVENT, listener)
node = MockNode(node_id=11)
value = 234
hass.async_add_job(mock_receivers[0], node, value)
yield from hass.async_block_till_done()
assert len(events) == 1
assert events[0].data[const.ATTR_OBJECT_ID] == "mock_node_11"
assert events[0].data[const.ATTR_BASIC_LEVEL] == value
@asyncio.coroutine
def test_network_ready(hass, mock_openzwave):
"""Test Node network ready event."""
mock_receivers = []
def mock_connect(receiver, signal, *args, **kwargs):
if signal == MockNetwork.SIGNAL_ALL_NODES_QUERIED:
mock_receivers.append(receiver)
with patch('pydispatch.dispatcher.connect', new=mock_connect):
yield from async_setup_component(hass, 'zwave', {'zwave': {}})
assert len(mock_receivers) == 1
events = []
def listener(event):
events.append(event)
hass.bus.async_listen(const.EVENT_NETWORK_COMPLETE, listener)
hass.async_add_job(mock_receivers[0])
yield from hass.async_block_till_done()
assert len(events) == 1
@asyncio.coroutine
def test_network_complete(hass, mock_openzwave):
"""Test Node network complete event."""
mock_receivers = []
def mock_connect(receiver, signal, *args, **kwargs):
if signal == MockNetwork.SIGNAL_AWAKE_NODES_QUERIED:
mock_receivers.append(receiver)
with patch('pydispatch.dispatcher.connect', new=mock_connect):
yield from async_setup_component(hass, 'zwave', {'zwave': {}})
assert len(mock_receivers) == 1
events = []
def listener(event):
events.append(event)
hass.bus.async_listen(const.EVENT_NETWORK_READY, listener)
hass.async_add_job(mock_receivers[0])
yield from hass.async_block_till_done()
assert len(events) == 1
class TestZWaveDeviceEntityValues(unittest.TestCase):
"""Tests for the ZWaveDeviceEntityValues helper."""