mirror of
https://github.com/home-assistant/core.git
synced 2025-04-23 16:57:53 +00:00
Add binary sensors to Risco integration (#39137)
* Add binary sensors to Risco integration * Minor cleanups * RiscoEntity base class * Platinum score * Remove alarm parameter in _setup_risco * Avoid zones and partitions sharing unique ids
This commit is contained in:
parent
73328dab5e
commit
3198233b8f
@ -19,7 +19,7 @@ from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, Upda
|
||||
|
||||
from .const import DATA_COORDINATOR, DEFAULT_SCAN_INTERVAL, DOMAIN
|
||||
|
||||
PLATFORMS = ["alarm_control_panel"]
|
||||
PLATFORMS = ["alarm_control_panel", "binary_sensor"]
|
||||
UNDO_UPDATE_LISTENER = "undo_update_listener"
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
@ -15,6 +15,7 @@ from homeassistant.const import (
|
||||
)
|
||||
|
||||
from .const import DATA_COORDINATOR, DOMAIN
|
||||
from .entity import RiscoEntity
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -37,39 +38,17 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
|
||||
async_add_entities(entities, False)
|
||||
|
||||
|
||||
class RiscoAlarm(AlarmControlPanelEntity):
|
||||
class RiscoAlarm(AlarmControlPanelEntity, RiscoEntity):
|
||||
"""Representation of a Risco partition."""
|
||||
|
||||
def __init__(self, coordinator, partition_id):
|
||||
"""Init the partition."""
|
||||
self._coordinator = coordinator
|
||||
super().__init__(coordinator)
|
||||
self._partition_id = partition_id
|
||||
self._partition = self._coordinator.data.partitions[self._partition_id]
|
||||
|
||||
@property
|
||||
def should_poll(self):
|
||||
"""No need to poll. Coordinator notifies entity of updates."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def available(self):
|
||||
"""Return if entity is available."""
|
||||
return self._coordinator.last_update_success
|
||||
|
||||
def _refresh_from_coordinator(self):
|
||||
def _get_data_from_coordinator(self):
|
||||
self._partition = self._coordinator.data.partitions[self._partition_id]
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_added_to_hass(self):
|
||||
"""When entity is added to hass."""
|
||||
self.async_on_remove(
|
||||
self._coordinator.async_add_listener(self._refresh_from_coordinator)
|
||||
)
|
||||
|
||||
@property
|
||||
def _risco(self):
|
||||
"""Return the Risco API object."""
|
||||
return self._coordinator.risco
|
||||
|
||||
@property
|
||||
def device_info(self):
|
||||
@ -132,10 +111,3 @@ class RiscoAlarm(AlarmControlPanelEntity):
|
||||
alarm = await getattr(self._risco, method)(self._partition_id)
|
||||
self._partition = alarm.partitions[self._partition_id]
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_update(self):
|
||||
"""Update the entity.
|
||||
|
||||
Only used by the generic entity update service.
|
||||
"""
|
||||
await self._coordinator.async_request_refresh()
|
||||
|
66
homeassistant/components/risco/binary_sensor.py
Normal file
66
homeassistant/components/risco/binary_sensor.py
Normal file
@ -0,0 +1,66 @@
|
||||
"""Support for Risco alarm zones."""
|
||||
from homeassistant.components.binary_sensor import (
|
||||
DEVICE_CLASS_MOTION,
|
||||
BinarySensorEntity,
|
||||
)
|
||||
|
||||
from .const import DATA_COORDINATOR, DOMAIN
|
||||
from .entity import RiscoEntity
|
||||
|
||||
|
||||
async def async_setup_entry(hass, config_entry, async_add_entities):
|
||||
"""Set up the Risco alarm control panel."""
|
||||
coordinator = hass.data[DOMAIN][config_entry.entry_id][DATA_COORDINATOR]
|
||||
entities = [
|
||||
RiscoBinarySensor(coordinator, zone_id, zone)
|
||||
for zone_id, zone in coordinator.data.zones.items()
|
||||
]
|
||||
|
||||
async_add_entities(entities, False)
|
||||
|
||||
|
||||
class RiscoBinarySensor(BinarySensorEntity, RiscoEntity):
|
||||
"""Representation of a Risco zone as a binary sensor."""
|
||||
|
||||
def __init__(self, coordinator, zone_id, zone):
|
||||
"""Init the zone."""
|
||||
super().__init__(coordinator)
|
||||
self._zone_id = zone_id
|
||||
self._zone = zone
|
||||
|
||||
def _get_data_from_coordinator(self):
|
||||
self._zone = self._coordinator.data.zones[self._zone_id]
|
||||
|
||||
@property
|
||||
def device_info(self):
|
||||
"""Return device info for this device."""
|
||||
return {
|
||||
"identifiers": {(DOMAIN, self.unique_id)},
|
||||
"name": self.name,
|
||||
"manufacturer": "Risco",
|
||||
}
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""Return the name of the zone."""
|
||||
return self._zone.name
|
||||
|
||||
@property
|
||||
def unique_id(self):
|
||||
"""Return a unique id for this zone."""
|
||||
return f"{self._risco.site_uuid}_zone_{self._zone_id}"
|
||||
|
||||
@property
|
||||
def device_state_attributes(self):
|
||||
"""Return the state attributes."""
|
||||
return {"bypassed": self._zone.bypassed}
|
||||
|
||||
@property
|
||||
def is_on(self):
|
||||
"""Return true if sensor is on."""
|
||||
return self._zone.triggered
|
||||
|
||||
@property
|
||||
def device_class(self):
|
||||
"""Return the class of this sensor, from DEVICE_CLASSES."""
|
||||
return DEVICE_CLASS_MOTION
|
45
homeassistant/components/risco/entity.py
Normal file
45
homeassistant/components/risco/entity.py
Normal file
@ -0,0 +1,45 @@
|
||||
"""A risco entity base class."""
|
||||
from homeassistant.helpers.entity import Entity
|
||||
|
||||
|
||||
class RiscoEntity(Entity):
|
||||
"""Risco entity base class."""
|
||||
|
||||
def __init__(self, coordinator):
|
||||
"""Init the instance."""
|
||||
self._coordinator = coordinator
|
||||
|
||||
@property
|
||||
def should_poll(self):
|
||||
"""No need to poll. Coordinator notifies entity of updates."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def available(self):
|
||||
"""Return if entity is available."""
|
||||
return self._coordinator.last_update_success
|
||||
|
||||
def _get_data_from_coordinator(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def _refresh_from_coordinator(self):
|
||||
self._get_data_from_coordinator()
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_added_to_hass(self):
|
||||
"""When entity is added to hass."""
|
||||
self.async_on_remove(
|
||||
self._coordinator.async_add_listener(self._refresh_from_coordinator)
|
||||
)
|
||||
|
||||
@property
|
||||
def _risco(self):
|
||||
"""Return the Risco API object."""
|
||||
return self._coordinator.risco
|
||||
|
||||
async def async_update(self):
|
||||
"""Update the entity.
|
||||
|
||||
Only used by the generic entity update service.
|
||||
"""
|
||||
await self._coordinator.async_request_refresh()
|
@ -8,5 +8,6 @@
|
||||
],
|
||||
"codeowners": [
|
||||
"@OnFreund"
|
||||
]
|
||||
],
|
||||
"quality_scale": "platinum"
|
||||
}
|
@ -63,7 +63,7 @@ def two_part_alarm():
|
||||
yield alarm_mock
|
||||
|
||||
|
||||
async def _setup_risco(hass, alarm=MagicMock()):
|
||||
async def _setup_risco(hass):
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
@ -121,7 +121,7 @@ async def test_setup(hass, two_part_alarm):
|
||||
assert not registry.async_is_registered(FIRST_ENTITY_ID)
|
||||
assert not registry.async_is_registered(SECOND_ENTITY_ID)
|
||||
|
||||
await _setup_risco(hass, two_part_alarm)
|
||||
await _setup_risco(hass)
|
||||
|
||||
assert registry.async_is_registered(FIRST_ENTITY_ID)
|
||||
assert registry.async_is_registered(SECOND_ENTITY_ID)
|
||||
@ -146,7 +146,7 @@ async def _check_state(hass, alarm, property, state, entity_id, partition_id):
|
||||
|
||||
async def test_states(hass, two_part_alarm):
|
||||
"""Test the various alarm states."""
|
||||
await _setup_risco(hass, two_part_alarm)
|
||||
await _setup_risco(hass)
|
||||
|
||||
assert hass.states.get(FIRST_ENTITY_ID).state == STATE_UNKNOWN
|
||||
await _check_state(
|
||||
@ -207,7 +207,7 @@ async def _call_alarm_service(hass, service, entity_id):
|
||||
|
||||
async def test_sets(hass, two_part_alarm):
|
||||
"""Test settings the various modes."""
|
||||
await _setup_risco(hass, two_part_alarm)
|
||||
await _setup_risco(hass)
|
||||
|
||||
await _test_servie_call(hass, SERVICE_ALARM_DISARM, "disarm", FIRST_ENTITY_ID, 0)
|
||||
await _test_servie_call(hass, SERVICE_ALARM_DISARM, "disarm", SECOND_ENTITY_ID, 1)
|
||||
|
156
tests/components/risco/test_binary_sensor.py
Normal file
156
tests/components/risco/test_binary_sensor.py
Normal file
@ -0,0 +1,156 @@
|
||||
"""Tests for the Risco binary sensors."""
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.risco import CannotConnectError, UnauthorizedError
|
||||
from homeassistant.components.risco.const import DOMAIN
|
||||
from homeassistant.const import (
|
||||
CONF_PASSWORD,
|
||||
CONF_PIN,
|
||||
CONF_USERNAME,
|
||||
STATE_OFF,
|
||||
STATE_ON,
|
||||
)
|
||||
from homeassistant.helpers.entity_component import async_update_entity
|
||||
|
||||
from tests.async_mock import MagicMock, PropertyMock, patch
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
TEST_CONFIG = {
|
||||
CONF_USERNAME: "test-username",
|
||||
CONF_PASSWORD: "test-password",
|
||||
CONF_PIN: "1234",
|
||||
}
|
||||
TEST_SITE_UUID = "test-site-uuid"
|
||||
TEST_SITE_NAME = "test-site-name"
|
||||
FIRST_ENTITY_ID = "binary_sensor.zone_0"
|
||||
SECOND_ENTITY_ID = "binary_sensor.zone_1"
|
||||
|
||||
|
||||
def _zone_mock():
|
||||
return MagicMock(triggered=False, bypassed=False,)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def two_zone_alarm():
|
||||
"""Fixture to mock alarm with two zones."""
|
||||
zone_mocks = {0: _zone_mock(), 1: _zone_mock()}
|
||||
alarm_mock = MagicMock()
|
||||
with patch.object(
|
||||
zone_mocks[0], "id", new_callable=PropertyMock(return_value=0)
|
||||
), patch.object(
|
||||
zone_mocks[0], "name", new_callable=PropertyMock(return_value="Zone 0")
|
||||
), patch.object(
|
||||
zone_mocks[1], "id", new_callable=PropertyMock(return_value=1)
|
||||
), patch.object(
|
||||
zone_mocks[1], "name", new_callable=PropertyMock(return_value="Zone 1")
|
||||
), patch.object(
|
||||
alarm_mock, "zones", new_callable=PropertyMock(return_value=zone_mocks),
|
||||
), patch(
|
||||
"homeassistant.components.risco.RiscoAPI.get_state", return_value=alarm_mock,
|
||||
):
|
||||
yield alarm_mock
|
||||
|
||||
|
||||
async def _setup_risco(hass):
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.risco.RiscoAPI.login", return_value=True,
|
||||
), patch(
|
||||
"homeassistant.components.risco.RiscoAPI.site_uuid",
|
||||
new_callable=PropertyMock(return_value=TEST_SITE_UUID),
|
||||
), patch(
|
||||
"homeassistant.components.risco.RiscoAPI.site_name",
|
||||
new_callable=PropertyMock(return_value=TEST_SITE_NAME),
|
||||
), patch(
|
||||
"homeassistant.components.risco.RiscoAPI.close"
|
||||
):
|
||||
await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
return config_entry
|
||||
|
||||
|
||||
async def test_cannot_connect(hass):
|
||||
"""Test connection error."""
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.risco.RiscoAPI.login", side_effect=CannotConnectError,
|
||||
):
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG)
|
||||
config_entry.add_to_hass(hass)
|
||||
await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
registry = await hass.helpers.entity_registry.async_get_registry()
|
||||
assert not registry.async_is_registered(FIRST_ENTITY_ID)
|
||||
assert not registry.async_is_registered(SECOND_ENTITY_ID)
|
||||
|
||||
|
||||
async def test_unauthorized(hass):
|
||||
"""Test unauthorized error."""
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.risco.RiscoAPI.login", side_effect=UnauthorizedError,
|
||||
):
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG)
|
||||
config_entry.add_to_hass(hass)
|
||||
await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
registry = await hass.helpers.entity_registry.async_get_registry()
|
||||
assert not registry.async_is_registered(FIRST_ENTITY_ID)
|
||||
assert not registry.async_is_registered(SECOND_ENTITY_ID)
|
||||
|
||||
|
||||
async def test_setup(hass, two_zone_alarm):
|
||||
"""Test entity setup."""
|
||||
registry = await hass.helpers.entity_registry.async_get_registry()
|
||||
|
||||
assert not registry.async_is_registered(FIRST_ENTITY_ID)
|
||||
assert not registry.async_is_registered(SECOND_ENTITY_ID)
|
||||
|
||||
await _setup_risco(hass)
|
||||
|
||||
assert registry.async_is_registered(FIRST_ENTITY_ID)
|
||||
assert registry.async_is_registered(SECOND_ENTITY_ID)
|
||||
|
||||
registry = await hass.helpers.device_registry.async_get_registry()
|
||||
device = registry.async_get_device({(DOMAIN, TEST_SITE_UUID + "_zone_0")}, {})
|
||||
assert device is not None
|
||||
assert device.manufacturer == "Risco"
|
||||
|
||||
device = registry.async_get_device({(DOMAIN, TEST_SITE_UUID + "_zone_1")}, {})
|
||||
assert device is not None
|
||||
assert device.manufacturer == "Risco"
|
||||
|
||||
|
||||
async def _check_state(hass, alarm, triggered, bypassed, entity_id, zone_id):
|
||||
with patch.object(
|
||||
alarm.zones[zone_id],
|
||||
"triggered",
|
||||
new_callable=PropertyMock(return_value=triggered),
|
||||
), patch.object(
|
||||
alarm.zones[zone_id],
|
||||
"bypassed",
|
||||
new_callable=PropertyMock(return_value=bypassed),
|
||||
):
|
||||
await async_update_entity(hass, entity_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
expected_triggered = STATE_ON if triggered else STATE_OFF
|
||||
assert hass.states.get(entity_id).state == expected_triggered
|
||||
assert hass.states.get(entity_id).attributes["bypassed"] == bypassed
|
||||
|
||||
|
||||
async def test_states(hass, two_zone_alarm):
|
||||
"""Test the various alarm states."""
|
||||
await _setup_risco(hass)
|
||||
|
||||
await _check_state(hass, two_zone_alarm, True, True, FIRST_ENTITY_ID, 0)
|
||||
await _check_state(hass, two_zone_alarm, True, False, FIRST_ENTITY_ID, 0)
|
||||
await _check_state(hass, two_zone_alarm, False, True, FIRST_ENTITY_ID, 0)
|
||||
await _check_state(hass, two_zone_alarm, False, False, FIRST_ENTITY_ID, 0)
|
||||
await _check_state(hass, two_zone_alarm, True, True, SECOND_ENTITY_ID, 1)
|
||||
await _check_state(hass, two_zone_alarm, True, False, SECOND_ENTITY_ID, 1)
|
||||
await _check_state(hass, two_zone_alarm, False, True, SECOND_ENTITY_ID, 1)
|
||||
await _check_state(hass, two_zone_alarm, False, False, SECOND_ENTITY_ID, 1)
|
Loading…
x
Reference in New Issue
Block a user