Switch tests to use hass objects instead of direct (#37530)

* Switch tests to use hass objects instead of direct

* Make sure sensor update state

* Add some initial binary sensor tests

* Add initial binary sensor tests

* Add tests for pt2262

* Add test for off delay
This commit is contained in:
Joakim Plate 2020-07-05 22:41:11 +02:00 committed by GitHub
parent 3ad59f877c
commit 01fd33f173
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 640 additions and 548 deletions

View File

@ -175,6 +175,8 @@ def get_rfx_object(packetid):
obj = rfxtrxmod.StatusEvent(pkt)
else:
obj = rfxtrxmod.ControlEvent(pkt)
obj.data = binarypacket
return obj

View File

@ -93,7 +93,6 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
entity.get(CONF_COMMAND_ON),
entity.get(CONF_COMMAND_OFF),
)
device.hass = hass
sensors.append(device)
RFX_DEVICES[device_id] = device
@ -121,7 +120,6 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
pkt_id = "".join(f"{x:02x}" for x in event.data)
sensor = RfxtrxBinarySensor(event, pkt_id)
sensor.hass = hass
RFX_DEVICES[device_id] = sensor
add_entities([sensor])
_LOGGER.info(
@ -271,4 +269,5 @@ class RfxtrxBinarySensor(BinarySensorEntity):
def update_state(self, state):
"""Update the state of the device."""
self._state = state
self.schedule_update_ha_state()
if self.hass:
self.schedule_update_ha_state()

View File

@ -92,6 +92,8 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
continue
sensor = sensors[data_type]
sensor.event = event
if sensor.hass:
sensor.schedule_update_ha_state()
# Fire event
if sensor.should_fire_event:
sensor.hass.bus.fire(

View File

@ -2,5 +2,8 @@
from homeassistant.components import rfxtrx
async def _signal_event(hass, event):
async def _signal_event(hass, packet_id):
event = rfxtrx.get_rfx_object(packet_id)
await hass.async_add_executor_job(rfxtrx.RECEIVED_EVT_SUBSCRIBERS[0], event)
await hass.async_block_till_done()
return event

View File

@ -68,7 +68,10 @@ async def rfxtrx_fixture(hass):
"""Stub out core rfxtrx to test platform."""
mock_component(hass, "rfxtrx")
yield
rfxobject = mock.MagicMock()
hass.data[rfxtrx_core.DATA_RFXOBJECT] = rfxobject
yield rfxobject
# These test don't listen for stop to do cleanup.
if rfxtrx_core.DATA_RFXOBJECT in hass.data:

View File

@ -0,0 +1,208 @@
"""The tests for the Rfxtrx sensor platform."""
from datetime import timedelta
from homeassistant.setup import async_setup_component
from homeassistant.util.dt import utcnow
from . import _signal_event
from tests.common import async_fire_time_changed
async def test_default_config(hass, rfxtrx):
"""Test with 0 sensor."""
await async_setup_component(
hass, "binary_sensor", {"binary_sensor": {"platform": "rfxtrx", "devices": {}}}
)
await hass.async_block_till_done()
assert len(hass.states.async_all()) == 0
async def test_one(hass, rfxtrx):
"""Test with 1 sensor."""
await async_setup_component(
hass,
"binary_sensor",
{
"binary_sensor": {
"platform": "rfxtrx",
"devices": {"0a52080705020095220269": {"name": "Test"}},
}
},
)
await hass.async_block_till_done()
state = hass.states.get("binary_sensor.test")
assert state
assert state.state == "off"
assert state.attributes.get("friendly_name") == "Test"
async def test_one_pt2262(hass, rfxtrx):
"""Test with 1 sensor."""
await async_setup_component(
hass,
"binary_sensor",
{
"binary_sensor": {
"platform": "rfxtrx",
"devices": {
"0913000022670e013970": {
"name": "Test",
"data_bits": 4,
"command_on": 0xE,
"command_off": 0x7,
}
},
}
},
)
await hass.async_block_till_done()
state = hass.states.get("binary_sensor.test")
assert state
assert state.state == "off" # probably aught to be unknown
assert state.attributes.get("friendly_name") == "Test"
await _signal_event(hass, "0913000022670e013970")
state = hass.states.get("binary_sensor.test")
assert state
assert state.state == "on"
assert state.attributes.get("friendly_name") == "Test"
await _signal_event(hass, "09130000226707013d70")
state = hass.states.get("binary_sensor.test")
assert state
assert state.state == "off"
assert state.attributes.get("friendly_name") == "Test"
async def test_several(hass, rfxtrx):
"""Test with 3."""
await async_setup_component(
hass,
"binary_sensor",
{
"binary_sensor": {
"platform": "rfxtrx",
"devices": {
"0b1100cd0213c7f230010f71": {"name": "Test"},
"0b1100100118cdea02010f70": {"name": "Bath"},
"0b1100101118cdea02010f70": {"name": "Living"},
},
}
},
)
await hass.async_block_till_done()
state = hass.states.get("binary_sensor.test")
assert state
assert state.state == "off"
assert state.attributes.get("friendly_name") == "Test"
state = hass.states.get("binary_sensor.bath")
assert state
assert state.state == "off"
assert state.attributes.get("friendly_name") == "Bath"
state = hass.states.get("binary_sensor.living")
assert state
assert state.state == "off"
assert state.attributes.get("friendly_name") == "Living"
async def test_discover(hass, rfxtrx):
"""Test with discovery."""
await async_setup_component(
hass,
"binary_sensor",
{"binary_sensor": {"platform": "rfxtrx", "automatic_add": True, "devices": {}}},
)
await hass.async_block_till_done()
await _signal_event(hass, "0b1100100118cdea02010f70")
state = hass.states.get("binary_sensor.0b1100100118cdea02010f70")
assert state
assert state.state == "on"
await _signal_event(hass, "0b1100100118cdeb02010f70")
state = hass.states.get("binary_sensor.0b1100100118cdeb02010f70")
assert state
assert state.state == "on"
# Trying to add a sensor
await _signal_event(hass, "0a52085e070100b31b0279")
state = hass.states.get("sensor.0a52085e070100b31b0279")
assert state is None
# Trying to add a light
await _signal_event(hass, "0b1100100118cdea02010f70")
state = hass.states.get("light.0b1100100118cdea02010f70")
assert state is None
# Trying to add a rollershutter
await _signal_event(hass, "0a1400adf394ab020e0060")
state = hass.states.get("cover.0a1400adf394ab020e0060")
assert state is None
async def test_discover_noautoadd(hass, rfxtrx):
"""Test with discovery of switch when auto add is False."""
await async_setup_component(
hass,
"binary_sensor",
{
"binary_sensor": {
"platform": "rfxtrx",
"automatic_add": False,
"devices": {},
}
},
)
await hass.async_block_till_done()
# Trying to add switch
await _signal_event(hass, "0b1100100118cdea02010f70")
assert hass.states.async_all() == []
async def test_off_delay(hass, rfxtrx):
"""Test with discovery."""
await async_setup_component(
hass,
"binary_sensor",
{
"binary_sensor": {
"platform": "rfxtrx",
"automatic_add": True,
"devices": {
"0b1100100118cdea02010f70": {"name": "Test", "off_delay": 5}
},
}
},
)
await hass.async_block_till_done()
state = hass.states.get("binary_sensor.test")
assert state
assert state.state == "off"
await _signal_event(hass, "0b1100100118cdea02010f70")
state = hass.states.get("binary_sensor.test")
assert state
assert state.state == "on"
base_time = utcnow()
async_fire_time_changed(hass, base_time + timedelta(seconds=4))
await hass.async_block_till_done()
state = hass.states.get("binary_sensor.test")
assert state
assert state.state == "on"
async_fire_time_changed(hass, base_time + timedelta(seconds=6))
await hass.async_block_till_done()
state = hass.states.get("binary_sensor.test")
assert state
assert state.state == "off"

View File

@ -1,5 +1,5 @@
"""The tests for the Rfxtrx cover platform."""
import RFXtrx as rfxtrxmod
from unittest.mock import call
from homeassistant.components import rfxtrx as rfxtrx_core
from homeassistant.setup import async_setup_component
@ -38,7 +38,7 @@ async def test_default_config(hass, rfxtrx):
)
await hass.async_block_till_done()
assert 0 == len(rfxtrx_core.RFX_DEVICES)
assert len(hass.states.async_all()) == 0
async def test_one_cover(hass, rfxtrx):
@ -55,20 +55,25 @@ async def test_one_cover(hass, rfxtrx):
)
await hass.async_block_till_done()
hass.data[rfxtrx_core.DATA_RFXOBJECT] = rfxtrxmod.Core(
"", transport_protocol=rfxtrxmod.DummyTransport
assert len(hass.states.async_all()) == 1
await hass.services.async_call(
"cover", "open_cover", {"entity_id": "cover.test"}, blocking=True
)
assert 1 == len(rfxtrx_core.RFX_DEVICES)
for id in rfxtrx_core.RFX_DEVICES:
entity = rfxtrx_core.RFX_DEVICES[id]
entity.hass = hass
assert entity.signal_repetitions == 1
assert not entity.should_fire_event
assert not entity.should_poll
entity.open_cover()
entity.close_cover()
entity.stop_cover()
await hass.services.async_call(
"cover", "close_cover", {"entity_id": "cover.test"}, blocking=True
)
await hass.services.async_call(
"cover", "stop_cover", {"entity_id": "cover.test"}, blocking=True
)
assert rfxtrx.transport.send.mock_calls == [
call(bytearray(b"\n\x14\x00\x00\x02\x13\xc7\xf2\x0f\x00\x00")),
call(bytearray(b"\n\x14\x00\x00\x02\x13\xc7\xf2\r\x00\x00")),
call(bytearray(b"\n\x14\x00\x00\x02\x13\xc7\xf2\x0e\x00\x00")),
]
async def test_several_covers(hass, rfxtrx):
@ -90,19 +95,22 @@ async def test_several_covers(hass, rfxtrx):
)
await hass.async_block_till_done()
assert 3 == len(rfxtrx_core.RFX_DEVICES)
device_num = 0
for id in rfxtrx_core.RFX_DEVICES:
entity = rfxtrx_core.RFX_DEVICES[id]
assert entity.signal_repetitions == 3
if entity.name == "Living":
device_num = device_num + 1
elif entity.name == "Bath":
device_num = device_num + 1
elif entity.name == "Test":
device_num = device_num + 1
state = hass.states.get("cover.test")
assert state
assert state.state == "closed"
assert state.attributes.get("friendly_name") == "Test"
assert 3 == device_num
state = hass.states.get("cover.bath")
assert state
assert state.state == "closed"
assert state.attributes.get("friendly_name") == "Bath"
state = hass.states.get("cover.living")
assert state
assert state.state == "closed"
assert state.attributes.get("friendly_name") == "Living"
assert len(hass.states.async_all()) == 3
async def test_discover_covers(hass, rfxtrx):
@ -114,35 +122,19 @@ async def test_discover_covers(hass, rfxtrx):
)
await hass.async_block_till_done()
event = rfxtrx_core.get_rfx_object("0a140002f38cae010f0070")
event.data = bytearray(
[0x0A, 0x14, 0x00, 0x02, 0xF3, 0x8C, 0xAE, 0x01, 0x0F, 0x00, 0x70]
)
await _signal_event(hass, "0a140002f38cae010f0070")
assert len(hass.states.async_all()) == 1
await _signal_event(hass, event)
assert 1 == len(rfxtrx_core.RFX_DEVICES)
event = rfxtrx_core.get_rfx_object("0a1400adf394ab020e0060")
event.data = bytearray(
[0x0A, 0x14, 0x00, 0xAD, 0xF3, 0x94, 0xAB, 0x02, 0x0E, 0x00, 0x60]
)
await _signal_event(hass, event)
assert 2 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, "0a1400adf394ab020e0060")
assert len(hass.states.async_all()) == 2
# Trying to add a sensor
event = rfxtrx_core.get_rfx_object("0a52085e070100b31b0279")
event.data = bytearray(b"\nR\x08^\x07\x01\x00\xb3\x1b\x02y")
await _signal_event(hass, event)
assert 2 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, "0a52085e070100b31b0279")
assert len(hass.states.async_all()) == 2
# Trying to add a light
event = rfxtrx_core.get_rfx_object("0b1100100118cdea02010f70")
event.data = bytearray(
[0x0B, 0x11, 0x11, 0x10, 0x01, 0x18, 0xCD, 0xEA, 0x01, 0x02, 0x0F, 0x70]
)
await _signal_event(hass, event)
assert 2 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, "0b1100100118cdea02010f70")
assert len(hass.states.async_all()) == 2
async def test_discover_cover_noautoadd(hass, rfxtrx):
@ -154,31 +146,8 @@ async def test_discover_cover_noautoadd(hass, rfxtrx):
)
await hass.async_block_till_done()
event = rfxtrx_core.get_rfx_object("0a1400adf394ab010d0060")
event.data = bytearray(
[0x0A, 0x14, 0x00, 0xAD, 0xF3, 0x94, 0xAB, 0x01, 0x0D, 0x00, 0x60]
)
await _signal_event(hass, "0a1400adf394ab010d0060")
assert len(hass.states.async_all()) == 0
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
event = rfxtrx_core.get_rfx_object("0a1400adf394ab020e0060")
event.data = bytearray(
[0x0A, 0x14, 0x00, 0xAD, 0xF3, 0x94, 0xAB, 0x02, 0x0E, 0x00, 0x60]
)
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
# Trying to add a sensor
event = rfxtrx_core.get_rfx_object("0a52085e070100b31b0279")
event.data = bytearray(b"\nR\x08^\x07\x01\x00\xb3\x1b\x02y")
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
# Trying to add a light
event = rfxtrx_core.get_rfx_object("0b1100100118cdea02010f70")
event.data = bytearray(
[0x0B, 0x11, 0x11, 0x10, 0x01, 0x18, 0xCD, 0xEA, 0x01, 0x02, 0x0F, 0x70]
)
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, "0a1400adf394ab020e0060")
assert len(hass.states.async_all()) == 0

View File

@ -130,23 +130,17 @@ async def test_fire_event(hass):
calls.append(event)
hass.bus.async_listen(rfxtrx.EVENT_BUTTON_PRESSED, record_event)
await hass.async_block_till_done()
entity = rfxtrx.RFX_DEVICES["213c7f2_16"]
entity.update_state(False, 0)
assert "Test" == entity.name
assert "off" == entity.state
assert entity.should_fire_event
event = rfxtrx.get_rfx_object("0b1100cd0213c7f210010f51")
event.data = bytearray(
[0x0B, 0x11, 0x00, 0x10, 0x01, 0x18, 0xCD, 0xEA, 0x01, 0x01, 0x0F, 0x70]
)
await _signal_event(hass, event)
await hass.async_block_till_done()
state = hass.states.get("switch.test")
assert state
assert state.state == "off"
await _signal_event(hass, "0b1100cd0213c7f210010f51")
state = hass.states.get("switch.test")
assert state
assert state.state == "on"
assert event.values["Command"] == "On"
assert "on" == entity.state
assert hass.states.get("switch.test").state == "on"
assert 1 == len(calls)
assert calls[0].data == {"entity_id": "switch.test", "state": "on"}
@ -191,11 +185,7 @@ async def test_fire_event_sensor(hass):
calls.append(event)
hass.bus.async_listen("signal_received", record_event)
await hass.async_block_till_done()
event = rfxtrx.get_rfx_object("0a520802060101ff0f0269")
event.data = bytearray(b"\nR\x08\x01\x07\x01\x00\xb8\x1b\x02y")
await _signal_event(hass, event)
await hass.async_block_till_done()
await _signal_event(hass, "0a520802060101ff0f0269")
assert 1 == len(calls)
assert calls[0].data == {"entity_id": "sensor.test_temperature"}

View File

@ -1,4 +1,8 @@
"""The tests for the Rfxtrx light platform."""
from unittest.mock import call
import pytest
from homeassistant.components import rfxtrx as rfxtrx_core
from homeassistant.setup import async_setup_component
@ -70,62 +74,67 @@ async def test_one_light(hass, rfxtrx):
)
await hass.async_block_till_done()
import RFXtrx as rfxtrxmod
state = hass.states.get("light.test")
assert state
assert state.state == "off"
assert state.attributes.get("friendly_name") == "Test"
hass.data[rfxtrx_core.DATA_RFXOBJECT] = rfxtrxmod.Core(
"", transport_protocol=rfxtrxmod.DummyTransport
await hass.services.async_call(
"light", "turn_on", {"entity_id": "light.test"}, blocking=True
)
state = hass.states.get("light.test")
assert state.state == "on"
assert state.attributes.get("brightness") == 255
assert 1 == len(rfxtrx_core.RFX_DEVICES)
entity = rfxtrx_core.RFX_DEVICES["213c7f2_16"]
entity.hass = hass
assert "Test" == entity.name
assert "off" == entity.state
assert entity.assumed_state
assert entity.signal_repetitions == 1
assert not entity.should_fire_event
assert not entity.should_poll
await hass.services.async_call(
"light", "turn_off", {"entity_id": "light.test"}, blocking=True
)
state = hass.states.get("light.test")
assert state.state == "off"
assert state.attributes.get("brightness") is None
assert not entity.is_on
await hass.services.async_call(
"light",
"turn_on",
{"entity_id": "light.test", "brightness": 100},
blocking=True,
)
state = hass.states.get("light.test")
assert state.state == "on"
assert state.attributes.get("brightness") == 100
entity.turn_on()
assert entity.is_on
assert entity.brightness == 255
await hass.services.async_call(
"light", "turn_on", {"entity_id": "light.test", "brightness": 10}, blocking=True
)
state = hass.states.get("light.test")
assert state.state == "on"
assert state.attributes.get("brightness") == 10
entity.turn_off()
assert not entity.is_on
assert entity.brightness == 0
await hass.services.async_call(
"light",
"turn_on",
{"entity_id": "light.test", "brightness": 255},
blocking=True,
)
state = hass.states.get("light.test")
assert state.state == "on"
assert state.attributes.get("brightness") == 255
entity.turn_on(brightness=100)
assert entity.is_on
assert entity.brightness == 100
await hass.services.async_call(
"light", "turn_off", {"entity_id": "light.test"}, blocking=True
)
state = hass.states.get("light.test")
assert state.state == "off"
assert state.attributes.get("brightness") is None
entity.turn_on(brightness=10)
assert entity.is_on
assert entity.brightness == 10
entity.turn_on(brightness=255)
assert entity.is_on
assert entity.brightness == 255
entity.turn_off()
assert "Test" == entity.name
assert "off" == entity.state
entity.turn_on()
assert "on" == entity.state
entity.turn_off()
assert "off" == entity.state
entity.turn_on(brightness=100)
assert "on" == entity.state
entity.turn_on(brightness=10)
assert "on" == entity.state
entity.turn_on(brightness=255)
assert "on" == entity.state
assert rfxtrx.transport.send.mock_calls == [
call(bytearray(b"\x0b\x11\x00\x00\x02\x13\xc7\xf2\x10\x01\x00\x00")),
call(bytearray(b"\x0b\x11\x00\x00\x02\x13\xc7\xf2\x10\x00\x00\x00")),
call(bytearray(b"\x0b\x11\x00\x00\x02\x13\xc7\xf2\x10\x02\x06\x00")),
call(bytearray(b"\x0b\x11\x00\x00\x02\x13\xc7\xf2\x10\x02\x00\x00")),
call(bytearray(b"\x0b\x11\x00\x00\x02\x13\xc7\xf2\x10\x02\x0f\x00")),
call(bytearray(b"\x0b\x11\x00\x00\x02\x13\xc7\xf2\x10\x00\x00\x00")),
]
async def test_several_lights(hass, rfxtrx):
@ -147,25 +156,48 @@ async def test_several_lights(hass, rfxtrx):
)
await hass.async_block_till_done()
assert 3 == len(rfxtrx_core.RFX_DEVICES)
device_num = 0
for id in rfxtrx_core.RFX_DEVICES:
entity = rfxtrx_core.RFX_DEVICES[id]
assert entity.signal_repetitions == 3
if entity.name == "Living":
device_num = device_num + 1
assert "off" == entity.state
assert "<Entity Living: off>" == entity.__str__()
elif entity.name == "Bath":
device_num = device_num + 1
assert "off" == entity.state
assert "<Entity Bath: off>" == entity.__str__()
elif entity.name == "Test":
device_num = device_num + 1
assert "off" == entity.state
assert "<Entity Test: off>" == entity.__str__()
assert len(hass.states.async_all()) == 3
assert 3 == device_num
state = hass.states.get("light.test")
assert state
assert state.state == "off"
assert state.attributes.get("friendly_name") == "Test"
state = hass.states.get("light.bath")
assert state
assert state.state == "off"
assert state.attributes.get("friendly_name") == "Bath"
state = hass.states.get("light.living")
assert state
assert state.state == "off"
assert state.attributes.get("friendly_name") == "Living"
assert len(hass.states.async_all()) == 3
@pytest.mark.parametrize("repetitions", [1, 3])
async def test_repetitions(hass, rfxtrx, repetitions):
"""Test signal repetitions."""
await async_setup_component(
hass,
"light",
{
"light": {
"platform": "rfxtrx",
"signal_repetitions": repetitions,
"devices": {"0b1100cd0213c7f230010f71": {"name": "Test"}},
}
},
)
await hass.async_block_till_done()
await hass.services.async_call(
"light", "turn_on", {"entity_id": "light.test"}, blocking=True
)
await hass.async_block_till_done()
assert rfxtrx.transport.send.call_count == repetitions
async def test_discover_light(hass, rfxtrx):
@ -177,52 +209,17 @@ async def test_discover_light(hass, rfxtrx):
)
await hass.async_block_till_done()
event = rfxtrx_core.get_rfx_object("0b11009e00e6116202020070")
event.data = bytearray(b"\x0b\x11\x00\x9e\x00\xe6\x11b\x02\x02\x00p")
await _signal_event(hass, "0b11009e00e6116202020070")
state = hass.states.get("light.0b11009e00e6116202020070")
assert state
assert state.state == "on"
assert state.attributes.get("friendly_name") == "0b11009e00e6116202020070"
await _signal_event(hass, event)
entity = rfxtrx_core.RFX_DEVICES["0e61162_2"]
assert 1 == len(rfxtrx_core.RFX_DEVICES)
assert "<Entity 0b11009e00e6116202020070: on>" == entity.__str__()
event = rfxtrx_core.get_rfx_object("0b11009e00e6116201010070")
event.data = bytearray(b"\x0b\x11\x00\x9e\x00\xe6\x11b\x01\x01\x00p")
await _signal_event(hass, event)
assert 1 == len(rfxtrx_core.RFX_DEVICES)
event = rfxtrx_core.get_rfx_object("0b1100120118cdea02020070")
event.data = bytearray(
[0x0B, 0x11, 0x00, 0x12, 0x01, 0x18, 0xCD, 0xEA, 0x02, 0x02, 0x00, 0x70]
)
await _signal_event(hass, event)
entity = rfxtrx_core.RFX_DEVICES["118cdea_2"]
assert 2 == len(rfxtrx_core.RFX_DEVICES)
assert "<Entity 0b1100120118cdea02020070: on>" == entity.__str__()
# trying to add a sensor
event = rfxtrx_core.get_rfx_object("0a52085e070100b31b0279")
event.data = bytearray(b"\nR\x08^\x07\x01\x00\xb3\x1b\x02y")
await _signal_event(hass, event)
assert 2 == len(rfxtrx_core.RFX_DEVICES)
# trying to add a switch
event = rfxtrx_core.get_rfx_object("0b1100100118cdea02010f70")
event.data = bytearray(
[0x0B, 0x11, 0x00, 0x10, 0x01, 0x18, 0xCD, 0xEA, 0x01, 0x01, 0x0F, 0x70]
)
await _signal_event(hass, event)
assert 2 == len(rfxtrx_core.RFX_DEVICES)
# Trying to add a rollershutter
event = rfxtrx_core.get_rfx_object("0a1400adf394ab020e0060")
event.data = bytearray(
[0x0A, 0x14, 0x00, 0xAD, 0xF3, 0x94, 0xAB, 0x02, 0x0E, 0x00, 0x60]
)
await _signal_event(hass, event)
assert 2 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, "0b1100120118cdea02020070")
state = hass.states.get("light.0b1100120118cdea02020070")
assert state
assert state.state == "on"
assert state.attributes.get("friendly_name") == "0b1100120118cdea02020070"
async def test_discover_light_noautoadd(hass, rfxtrx):
@ -234,48 +231,11 @@ async def test_discover_light_noautoadd(hass, rfxtrx):
)
await hass.async_block_till_done()
event = rfxtrx_core.get_rfx_object("0b1100120118cdea02020070")
event.data = bytearray(
[0x0B, 0x11, 0x00, 0x12, 0x01, 0x18, 0xCD, 0xEA, 0x02, 0x02, 0x00, 0x70]
)
await _signal_event(hass, "0b1100120118cdea02020070")
assert hass.states.async_all() == []
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, "0b1100120118cdea02010070")
assert hass.states.async_all() == []
event = rfxtrx_core.get_rfx_object("0b1100120118cdea02010070")
event.data = bytearray(
[0x0B, 0x11, 0x00, 0x12, 0x01, 0x18, 0xCD, 0xEA, 0x02, 0x01, 0x00, 0x70]
)
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
event = rfxtrx_core.get_rfx_object("0b1100120118cdea02020070")
event.data = bytearray(
[0x0B, 0x11, 0x00, 0x12, 0x01, 0x18, 0xCD, 0xEA, 0x02, 0x02, 0x00, 0x70]
)
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
# Trying to add a sensor
event = rfxtrx_core.get_rfx_object("0a52085e070100b31b0279")
event.data = bytearray(b"\nR\x08^\x07\x01\x00\xb3\x1b\x02y")
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
# Trying to add a switch
event = rfxtrx_core.get_rfx_object("0b1100100118cdea02010f70")
event.data = bytearray(
[0x0B, 0x11, 0x00, 0x10, 0x01, 0x18, 0xCD, 0xEA, 0x01, 0x01, 0x0F, 0x70]
)
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
# Trying to add a rollershutter
event = rfxtrx_core.get_rfx_object("0a1400adf394ab020e0060")
event.data = bytearray(
[0x0A, 0x14, 0x00, 0xAD, 0xF3, 0x94, 0xAB, 0x02, 0x0E, 0x00, 0x60]
)
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, "0b1100120118cdea02020070")
assert hass.states.async_all() == []

View File

@ -1,5 +1,4 @@
"""The tests for the Rfxtrx sensor platform."""
from homeassistant.components import rfxtrx as rfxtrx_core
from homeassistant.const import TEMP_CELSIUS, UNIT_PERCENTAGE
from homeassistant.setup import async_setup_component
@ -13,7 +12,7 @@ async def test_default_config(hass, rfxtrx):
)
await hass.async_block_till_done()
assert 0 == len(rfxtrx_core.RFX_DEVICES)
assert len(hass.states.async_all()) == 0
async def test_one_sensor(hass, rfxtrx):
@ -35,11 +34,11 @@ async def test_one_sensor(hass, rfxtrx):
)
await hass.async_block_till_done()
assert 1 == len(rfxtrx_core.RFX_DEVICES)
entity = rfxtrx_core.RFX_DEVICES["sensor_05_02"]["Temperature"]
assert "Test Temperature" == entity.name
assert TEMP_CELSIUS == entity.unit_of_measurement
assert entity.state is None
state = hass.states.get("sensor.test_temperature")
assert state
assert state.state == "unknown"
assert state.attributes.get("friendly_name") == "Test Temperature"
assert state.attributes.get("unit_of_measurement") == TEMP_CELSIUS
async def test_one_sensor_no_datatype(hass, rfxtrx):
@ -56,11 +55,11 @@ async def test_one_sensor_no_datatype(hass, rfxtrx):
)
await hass.async_block_till_done()
assert 1 == len(rfxtrx_core.RFX_DEVICES)
entity = rfxtrx_core.RFX_DEVICES["sensor_05_02"]["Temperature"]
assert "Test Temperature" == entity.name
assert TEMP_CELSIUS == entity.unit_of_measurement
assert entity.state is None
state = hass.states.get("sensor.test_temperature")
assert state
assert state.state == "unknown"
assert state.attributes.get("friendly_name") == "Test Temperature"
assert state.attributes.get("unit_of_measurement") == TEMP_CELSIUS
async def test_several_sensors(hass, rfxtrx):
@ -86,27 +85,23 @@ async def test_several_sensors(hass, rfxtrx):
)
await hass.async_block_till_done()
assert 2 == len(rfxtrx_core.RFX_DEVICES)
device_num = 0
for id in rfxtrx_core.RFX_DEVICES:
if id == "sensor_06_01":
device_num = device_num + 1
assert len(rfxtrx_core.RFX_DEVICES[id]) == 2
_entity_temp = rfxtrx_core.RFX_DEVICES[id]["Temperature"]
_entity_hum = rfxtrx_core.RFX_DEVICES[id]["Humidity"]
assert UNIT_PERCENTAGE == _entity_hum.unit_of_measurement
assert "Bath" == _entity_hum.__str__()
assert _entity_hum.state is None
assert TEMP_CELSIUS == _entity_temp.unit_of_measurement
assert "Bath" == _entity_temp.__str__()
elif id == "sensor_05_02":
device_num = device_num + 1
entity = rfxtrx_core.RFX_DEVICES[id]["Temperature"]
assert entity.state is None
assert TEMP_CELSIUS == entity.unit_of_measurement
assert "Test" == entity.__str__()
state = hass.states.get("sensor.test_temperature")
assert state
assert state.state == "unknown"
assert state.attributes.get("friendly_name") == "Test Temperature"
assert state.attributes.get("unit_of_measurement") == TEMP_CELSIUS
assert 2 == device_num
state = hass.states.get("sensor.bath_temperature")
assert state
assert state.state == "unknown"
assert state.attributes.get("friendly_name") == "Bath Temperature"
assert state.attributes.get("unit_of_measurement") == TEMP_CELSIUS
state = hass.states.get("sensor.bath_humidity")
assert state
assert state.state == "unknown"
assert state.attributes.get("friendly_name") == "Bath Humidity"
assert state.attributes.get("unit_of_measurement") == UNIT_PERCENTAGE
async def test_discover_sensor(hass, rfxtrx):
@ -118,59 +113,61 @@ async def test_discover_sensor(hass, rfxtrx):
)
await hass.async_block_till_done()
event = rfxtrx_core.get_rfx_object("0a520801070100b81b0279")
event.data = bytearray(b"\nR\x08\x01\x07\x01\x00\xb8\x1b\x02y")
await _signal_event(hass, event)
await _signal_event(hass, "0a520801070100b81b0279")
state = hass.states.get("sensor.0a520801070100b81b0279_temperature")
assert state
assert state.state == "18.4"
assert (
state.attributes.items()
>= {
"friendly_name": "0a520801070100b81b0279 Temperature",
"unit_of_measurement": TEMP_CELSIUS,
"Humidity status": "normal",
"Temperature": 18.4,
"Rssi numeric": 7,
"Humidity": 27,
"Battery numeric": 9,
"Humidity status numeric": 2,
}.items()
)
entity = rfxtrx_core.RFX_DEVICES["sensor_07_01"]["Temperature"]
assert 1 == len(rfxtrx_core.RFX_DEVICES)
assert {
"Humidity status": "normal",
"Temperature": 18.4,
"Rssi numeric": 7,
"Humidity": 27,
"Battery numeric": 9,
"Humidity status numeric": 2,
} == entity.device_state_attributes
assert "0a520801070100b81b0279" == entity.__str__()
await _signal_event(hass, "0a52080405020095240279")
state = hass.states.get("sensor.0a52080405020095240279_temperature")
assert state
assert state.state == "14.9"
assert (
state.attributes.items()
>= {
"friendly_name": "0a52080405020095240279 Temperature",
"unit_of_measurement": TEMP_CELSIUS,
"Humidity status": "normal",
"Temperature": 14.9,
"Rssi numeric": 7,
"Humidity": 36,
"Battery numeric": 9,
"Humidity status numeric": 2,
}.items()
)
await _signal_event(hass, event)
assert 1 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, "0a52085e070100b31b0279")
state = hass.states.get("sensor.0a520801070100b81b0279_temperature")
assert state
assert state.state == "17.9"
assert (
state.attributes.items()
>= {
"friendly_name": "0a520801070100b81b0279 Temperature",
"unit_of_measurement": TEMP_CELSIUS,
"Humidity status": "normal",
"Temperature": 17.9,
"Rssi numeric": 7,
"Humidity": 27,
"Battery numeric": 9,
"Humidity status numeric": 2,
}.items()
)
event = rfxtrx_core.get_rfx_object("0a52080405020095240279")
event.data = bytearray(b"\nR\x08\x04\x05\x02\x00\x95$\x02y")
await _signal_event(hass, event)
entity = rfxtrx_core.RFX_DEVICES["sensor_05_02"]["Temperature"]
assert 2 == len(rfxtrx_core.RFX_DEVICES)
assert {
"Humidity status": "normal",
"Temperature": 14.9,
"Rssi numeric": 7,
"Humidity": 36,
"Battery numeric": 9,
"Humidity status numeric": 2,
} == entity.device_state_attributes
assert "0a52080405020095240279" == entity.__str__()
event = rfxtrx_core.get_rfx_object("0a52085e070100b31b0279")
event.data = bytearray(b"\nR\x08^\x07\x01\x00\xb3\x1b\x02y")
await _signal_event(hass, event)
entity = rfxtrx_core.RFX_DEVICES["sensor_07_01"]["Temperature"]
assert 2 == len(rfxtrx_core.RFX_DEVICES)
assert {
"Humidity status": "normal",
"Temperature": 17.9,
"Rssi numeric": 7,
"Humidity": 27,
"Battery numeric": 9,
"Humidity status numeric": 2,
} == entity.device_state_attributes
assert "0a520801070100b81b0279" == entity.__str__()
# trying to add a switch
event = rfxtrx_core.get_rfx_object("0b1100cd0213c7f210010f70")
await _signal_event(hass, event)
assert 2 == len(rfxtrx_core.RFX_DEVICES)
assert len(hass.states.async_all()) == 2
async def test_discover_sensor_noautoadd(hass, rfxtrx):
@ -182,25 +179,14 @@ async def test_discover_sensor_noautoadd(hass, rfxtrx):
)
await hass.async_block_till_done()
event = rfxtrx_core.get_rfx_object("0a520801070100b81b0279")
event.data = bytearray(b"\nR\x08\x01\x07\x01\x00\xb8\x1b\x02y")
await _signal_event(hass, "0a520801070100b81b0279")
assert len(hass.states.async_all()) == 0
assert 0 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, "0a52080405020095240279")
assert len(hass.states.async_all()) == 0
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
event = rfxtrx_core.get_rfx_object("0a52080405020095240279")
event.data = bytearray(b"\nR\x08\x04\x05\x02\x00\x95$\x02y")
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
event = rfxtrx_core.get_rfx_object("0a52085e070100b31b0279")
event.data = bytearray(b"\nR\x08^\x07\x01\x00\xb3\x1b\x02y")
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, "0a52085e070100b31b0279")
assert len(hass.states.async_all()) == 0
async def test_update_of_sensors(hass, rfxtrx):
@ -226,83 +212,93 @@ async def test_update_of_sensors(hass, rfxtrx):
)
await hass.async_block_till_done()
assert 2 == len(rfxtrx_core.RFX_DEVICES)
device_num = 0
for id in rfxtrx_core.RFX_DEVICES:
if id == "sensor_06_01":
device_num = device_num + 1
assert len(rfxtrx_core.RFX_DEVICES[id]) == 2
_entity_temp = rfxtrx_core.RFX_DEVICES[id]["Temperature"]
_entity_hum = rfxtrx_core.RFX_DEVICES[id]["Humidity"]
assert UNIT_PERCENTAGE == _entity_hum.unit_of_measurement
assert "Bath" == _entity_hum.__str__()
assert _entity_temp.state is None
assert TEMP_CELSIUS == _entity_temp.unit_of_measurement
assert "Bath" == _entity_temp.__str__()
elif id == "sensor_05_02":
device_num = device_num + 1
entity = rfxtrx_core.RFX_DEVICES[id]["Temperature"]
assert entity.state is None
assert TEMP_CELSIUS == entity.unit_of_measurement
assert "Test" == entity.__str__()
state = hass.states.get("sensor.test_temperature")
assert state
assert state.state == "unknown"
assert (
state.attributes.items()
>= {
"friendly_name": "Test Temperature",
"unit_of_measurement": TEMP_CELSIUS,
}.items()
)
assert 2 == device_num
state = hass.states.get("sensor.bath_temperature")
assert state
assert state.state == "unknown"
assert (
state.attributes.items()
>= {
"friendly_name": "Bath Temperature",
"unit_of_measurement": TEMP_CELSIUS,
}.items()
)
event = rfxtrx_core.get_rfx_object("0a520802060101ff0f0269")
event.data = bytearray(b"\nR\x08\x01\x07\x01\x00\xb8\x1b\x02y")
await _signal_event(hass, event)
state = hass.states.get("sensor.bath_humidity")
assert state
assert state.state == "unknown"
assert (
state.attributes.items()
>= {
"friendly_name": "Bath Humidity",
"unit_of_measurement": UNIT_PERCENTAGE,
}.items()
)
await _signal_event(hass, event)
event = rfxtrx_core.get_rfx_object("0a52080705020085220269")
event.data = bytearray(b"\nR\x08\x04\x05\x02\x00\x95$\x02y")
await _signal_event(hass, event)
assert len(hass.states.async_all()) == 3
assert 2 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, "0a520802060101ff0f0269")
await _signal_event(hass, "0a52080705020085220269")
device_num = 0
for id in rfxtrx_core.RFX_DEVICES:
if id == "sensor_06_01":
device_num = device_num + 1
assert len(rfxtrx_core.RFX_DEVICES[id]) == 2
_entity_temp = rfxtrx_core.RFX_DEVICES[id]["Temperature"]
_entity_hum = rfxtrx_core.RFX_DEVICES[id]["Humidity"]
assert UNIT_PERCENTAGE == _entity_hum.unit_of_measurement
assert 15 == _entity_hum.state
assert {
"Battery numeric": 9,
"Temperature": 51.1,
"Humidity": 15,
"Humidity status": "normal",
"Humidity status numeric": 2,
"Rssi numeric": 6,
} == _entity_hum.device_state_attributes
assert "Bath" == _entity_hum.__str__()
state = hass.states.get("sensor.test_temperature")
assert state
assert state.state == "13.3"
assert (
state.attributes.items()
>= {
"friendly_name": "Test Temperature",
"unit_of_measurement": TEMP_CELSIUS,
"Battery numeric": 9,
"Temperature": 13.3,
"Humidity": 34,
"Humidity status": "normal",
"Humidity status numeric": 2,
"Rssi numeric": 6,
}.items()
)
assert TEMP_CELSIUS == _entity_temp.unit_of_measurement
assert 51.1 == _entity_temp.state
assert {
"Battery numeric": 9,
"Temperature": 51.1,
"Humidity": 15,
"Humidity status": "normal",
"Humidity status numeric": 2,
"Rssi numeric": 6,
} == _entity_temp.device_state_attributes
assert "Bath" == _entity_temp.__str__()
elif id == "sensor_05_02":
device_num = device_num + 1
entity = rfxtrx_core.RFX_DEVICES[id]["Temperature"]
assert TEMP_CELSIUS == entity.unit_of_measurement
assert 13.3 == entity.state
assert {
"Humidity status": "normal",
"Temperature": 13.3,
"Rssi numeric": 6,
"Humidity": 34,
"Battery numeric": 9,
"Humidity status numeric": 2,
} == entity.device_state_attributes
assert "Test" == entity.__str__()
state = hass.states.get("sensor.bath_temperature")
assert state
assert state.state == "51.1"
assert (
state.attributes.items()
>= {
"friendly_name": "Bath Temperature",
"unit_of_measurement": TEMP_CELSIUS,
"Battery numeric": 9,
"Temperature": 51.1,
"Humidity": 15,
"Humidity status": "normal",
"Humidity status numeric": 2,
"Rssi numeric": 6,
}.items()
)
assert 2 == device_num
assert 2 == len(rfxtrx_core.RFX_DEVICES)
state = hass.states.get("sensor.bath_humidity")
assert state
assert state.state == "15"
assert (
state.attributes.items()
>= {
"friendly_name": "Bath Humidity",
"unit_of_measurement": UNIT_PERCENTAGE,
"Battery numeric": 9,
"Temperature": 51.1,
"Humidity": 15,
"Humidity status": "normal",
"Humidity status numeric": 2,
"Rssi numeric": 6,
}.items()
)
assert len(hass.states.async_all()) == 3

View File

@ -1,5 +1,7 @@
"""The tests for the RFXtrx switch platform."""
import RFXtrx as rfxtrxmod
from unittest.mock import call
import pytest
from homeassistant.components import rfxtrx as rfxtrx_core
from homeassistant.setup import async_setup_component
@ -82,8 +84,7 @@ async def test_default_config(hass, rfxtrx):
hass, "switch", {"switch": {"platform": "rfxtrx", "devices": {}}}
)
await hass.async_block_till_done()
assert 0 == len(rfxtrx_core.RFX_DEVICES)
assert hass.states.async_all() == []
async def test_one_switch(hass, rfxtrx):
@ -100,32 +101,29 @@ async def test_one_switch(hass, rfxtrx):
)
await hass.async_block_till_done()
hass.data[rfxtrx_core.DATA_RFXOBJECT] = rfxtrxmod.Core(
"", transport_protocol=rfxtrxmod.DummyTransport
state = hass.states.get("switch.test")
assert state
assert state.state == "off"
assert state.attributes.get("friendly_name") == "Test"
await hass.services.async_call(
"switch", "turn_on", {"entity_id": "switch.test"}, blocking=True
)
assert 1 == len(rfxtrx_core.RFX_DEVICES)
entity = rfxtrx_core.RFX_DEVICES["213c7f2_16"]
entity.hass = hass
assert "Test" == entity.name
assert "off" == entity.state
assert entity.assumed_state
assert entity.signal_repetitions == 1
assert not entity.should_fire_event
assert not entity.should_poll
state = hass.states.get("switch.test")
assert state.state == "on"
assert not entity.is_on
entity.turn_on()
assert entity.is_on
entity.turn_off()
assert not entity.is_on
await hass.services.async_call(
"switch", "turn_off", {"entity_id": "switch.test"}, blocking=True
)
assert "Test" == entity.name
assert "off" == entity.state
entity.turn_on()
assert "on" == entity.state
entity.turn_off()
assert "off" == entity.state
state = hass.states.get("switch.test")
assert state.state == "off"
assert rfxtrx.transport.send.mock_calls == [
call(bytearray(b"\x0b\x11\x00\x00\x02\x13\xc7\xf2\x10\x01\x00\x00")),
call(bytearray(b"\x0b\x11\x00\x00\x02\x13\xc7\xf2\x10\x00\x00\x00")),
]
async def test_several_switches(hass, rfxtrx):
@ -147,25 +145,44 @@ async def test_several_switches(hass, rfxtrx):
)
await hass.async_block_till_done()
assert 3 == len(rfxtrx_core.RFX_DEVICES)
device_num = 0
for id in rfxtrx_core.RFX_DEVICES:
entity = rfxtrx_core.RFX_DEVICES[id]
assert entity.signal_repetitions == 3
if entity.name == "Living":
device_num = device_num + 1
assert "off" == entity.state
assert "<Entity Living: off>" == entity.__str__()
elif entity.name == "Bath":
device_num = device_num + 1
assert "off" == entity.state
assert "<Entity Bath: off>" == entity.__str__()
elif entity.name == "Test":
device_num = device_num + 1
assert "off" == entity.state
assert "<Entity Test: off>" == entity.__str__()
state = hass.states.get("switch.test")
assert state
assert state.state == "off"
assert state.attributes.get("friendly_name") == "Test"
assert 3 == device_num
state = hass.states.get("switch.bath")
assert state
assert state.state == "off"
assert state.attributes.get("friendly_name") == "Bath"
state = hass.states.get("switch.living")
assert state
assert state.state == "off"
assert state.attributes.get("friendly_name") == "Living"
@pytest.mark.parametrize("repetitions", [1, 3])
async def test_repetitions(hass, rfxtrx, repetitions):
"""Test signal repetitions."""
await async_setup_component(
hass,
"switch",
{
"switch": {
"platform": "rfxtrx",
"signal_repetitions": repetitions,
"devices": {"0b1100cd0213c7f230010f71": {"name": "Test"}},
}
},
)
await hass.async_block_till_done()
await hass.services.async_call(
"switch", "turn_on", {"entity_id": "switch.test"}, blocking=True
)
await hass.async_block_till_done()
assert rfxtrx.transport.send.call_count == repetitions
async def test_discover_switch(hass, rfxtrx):
@ -177,50 +194,30 @@ async def test_discover_switch(hass, rfxtrx):
)
await hass.async_block_till_done()
event = rfxtrx_core.get_rfx_object("0b1100100118cdea02010f70")
event.data = bytearray(
[0x0B, 0x11, 0x00, 0x10, 0x01, 0x18, 0xCD, 0xEA, 0x01, 0x01, 0x0F, 0x70]
)
await _signal_event(hass, "0b1100100118cdea02010f70")
state = hass.states.get("switch.0b1100100118cdea02010f70")
assert state
assert state.state == "on"
await _signal_event(hass, event)
entity = rfxtrx_core.RFX_DEVICES["118cdea_2"]
assert 1 == len(rfxtrx_core.RFX_DEVICES)
assert "<Entity 0b1100100118cdea01010f70: on>" == entity.__str__()
await _signal_event(hass, event)
assert 1 == len(rfxtrx_core.RFX_DEVICES)
event = rfxtrx_core.get_rfx_object("0b1100100118cdeb02010f70")
event.data = bytearray(
[0x0B, 0x11, 0x00, 0x12, 0x01, 0x18, 0xCD, 0xEA, 0x02, 0x00, 0x00, 0x70]
)
await _signal_event(hass, event)
entity = rfxtrx_core.RFX_DEVICES["118cdeb_2"]
assert 2 == len(rfxtrx_core.RFX_DEVICES)
assert "<Entity 0b1100120118cdea02000070: on>" == entity.__str__()
await _signal_event(hass, "0b1100100118cdeb02010f70")
state = hass.states.get("switch.0b1100100118cdeb02010f70")
assert state
assert state.state == "on"
# Trying to add a sensor
event = rfxtrx_core.get_rfx_object("0a52085e070100b31b0279")
event.data = bytearray(b"\nR\x08^\x07\x01\x00\xb3\x1b\x02y")
await _signal_event(hass, event)
assert 2 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, "0a52085e070100b31b0279")
state = hass.states.get("sensor.0a52085e070100b31b0279")
assert state is None
# Trying to add a light
event = rfxtrx_core.get_rfx_object("0b1100100118cdea02010f70")
event.data = bytearray(
[0x0B, 0x11, 0x11, 0x10, 0x01, 0x18, 0xCD, 0xEA, 0x01, 0x02, 0x0F, 0x70]
)
await _signal_event(hass, event)
assert 2 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, "0b1100100118cdea02010f70")
state = hass.states.get("light.0b1100100118cdea02010f70")
assert state is None
# Trying to add a rollershutter
event = rfxtrx_core.get_rfx_object("0a1400adf394ab020e0060")
event.data = bytearray(
[0x0A, 0x14, 0x00, 0xAD, 0xF3, 0x94, 0xAB, 0x02, 0x0E, 0x00, 0x60]
)
await _signal_event(hass, event)
assert 2 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, "0a1400adf394ab020e0060")
state = hass.states.get("cover.0a1400adf394ab020e0060")
assert state is None
async def test_discover_switch_noautoadd(hass, rfxtrx):
@ -232,43 +229,6 @@ async def test_discover_switch_noautoadd(hass, rfxtrx):
)
await hass.async_block_till_done()
event = rfxtrx_core.get_rfx_object("0b1100100118cdea02010f70")
event.data = bytearray(
[0x0B, 0x11, 0x00, 0x10, 0x01, 0x18, 0xCD, 0xEA, 0x01, 0x01, 0x0F, 0x70]
)
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
event = rfxtrx_core.get_rfx_object("0b1100100118cdeb02010f70")
event.data = bytearray(
[0x0B, 0x11, 0x00, 0x12, 0x01, 0x18, 0xCD, 0xEA, 0x02, 0x00, 0x00, 0x70]
)
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
# Trying to add a sensor
event = rfxtrx_core.get_rfx_object("0a52085e070100b31b0279")
event.data = bytearray(b"\nR\x08^\x07\x01\x00\xb3\x1b\x02y")
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
# Trying to add a light
event = rfxtrx_core.get_rfx_object("0b1100100118cdea02010f70")
event.data = bytearray(
[0x0B, 0x11, 0x11, 0x10, 0x01, 0x18, 0xCD, 0xEA, 0x01, 0x02, 0x0F, 0x70]
)
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
# Trying to add a rollershutter
event = rfxtrx_core.get_rfx_object("0a1400adf394ab020e0060")
event.data = bytearray(
[0x0A, 0x14, 0x00, 0xAD, 0xF3, 0x94, 0xAB, 0x02, 0x0E, 0x00, 0x60]
)
await _signal_event(hass, event)
assert 0 == len(rfxtrx_core.RFX_DEVICES)
# Trying to add switch
await _signal_event(hass, "0b1100100118cdea02010f70")
assert hass.states.async_all() == []