diff --git a/homeassistant/components/risco/__init__.py b/homeassistant/components/risco/__init__.py index a9a462bf916..f143244d31d 100644 --- a/homeassistant/components/risco/__init__.py +++ b/homeassistant/components/risco/__init__.py @@ -40,7 +40,12 @@ from .const import ( TYPE_LOCAL, ) -PLATFORMS = [Platform.ALARM_CONTROL_PANEL, Platform.BINARY_SENSOR, Platform.SENSOR] +PLATFORMS = [ + Platform.ALARM_CONTROL_PANEL, + Platform.BINARY_SENSOR, + Platform.SENSOR, + Platform.SWITCH, +] LAST_EVENT_STORAGE_VERSION = 1 LAST_EVENT_TIMESTAMP_KEY = "last_event_timestamp" _LOGGER = logging.getLogger(__name__) diff --git a/homeassistant/components/risco/alarm_control_panel.py b/homeassistant/components/risco/alarm_control_panel.py index 79da100d6e1..7f4241048d7 100644 --- a/homeassistant/components/risco/alarm_control_panel.py +++ b/homeassistant/components/risco/alarm_control_panel.py @@ -40,7 +40,7 @@ from .const import ( RISCO_GROUPS, RISCO_PARTIAL_ARM, ) -from .entity import RiscoEntity +from .entity import RiscoCloudEntity _LOGGER = logging.getLogger(__name__) @@ -178,7 +178,7 @@ class RiscoAlarm(AlarmControlPanelEntity): raise NotImplementedError -class RiscoCloudAlarm(RiscoAlarm, RiscoEntity): +class RiscoCloudAlarm(RiscoAlarm, RiscoCloudEntity): """Representation of a Risco partition.""" def __init__( diff --git a/homeassistant/components/risco/binary_sensor.py b/homeassistant/components/risco/binary_sensor.py index bc021c2c364..b1f55dd8693 100644 --- a/homeassistant/components/risco/binary_sensor.py +++ b/homeassistant/components/risco/binary_sensor.py @@ -12,21 +12,11 @@ from homeassistant.components.binary_sensor import ( ) from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant -from homeassistant.helpers import entity_platform -from homeassistant.helpers.dispatcher import async_dispatcher_connect -from homeassistant.helpers.entity import DeviceInfo from homeassistant.helpers.entity_platform import AddEntitiesCallback -from . import LocalData, RiscoDataUpdateCoordinator, is_local, zone_update_signal +from . import LocalData, RiscoDataUpdateCoordinator, is_local from .const import DATA_COORDINATOR, DOMAIN -from .entity import RiscoEntity, binary_sensor_unique_id - -SERVICE_BYPASS_ZONE = "bypass_zone" -SERVICE_UNBYPASS_ZONE = "unbypass_zone" - - -def _unique_id_for_local(system_id: str, zone_id: int) -> str: - return f"{system_id}_zone_{zone_id}_local" +from .entity import RiscoCloudZoneEntity, RiscoLocalZoneEntity async def async_setup_entry( @@ -35,12 +25,6 @@ async def async_setup_entry( async_add_entities: AddEntitiesCallback, ) -> None: """Set up the Risco alarm control panel.""" - platform = entity_platform.async_get_current_platform() - platform.async_register_entity_service(SERVICE_BYPASS_ZONE, {}, "async_bypass_zone") - platform.async_register_entity_service( - SERVICE_UNBYPASS_ZONE, {}, "async_unbypass_zone" - ) - if is_local(config_entry): local_data: LocalData = hass.data[DOMAIN][config_entry.entry_id] async_add_entities( @@ -61,85 +45,34 @@ async def async_setup_entry( ) -class RiscoBinarySensor(BinarySensorEntity): - """Representation of a Risco zone as a binary sensor.""" +class RiscoCloudBinarySensor(RiscoCloudZoneEntity, BinarySensorEntity): + """Representation of a Risco cloud zone as a binary sensor.""" _attr_device_class = BinarySensorDeviceClass.MOTION - def __init__(self, *, zone_id: int, zone: Zone, **kwargs: Any) -> None: + def __init__( + self, coordinator: RiscoDataUpdateCoordinator, zone_id: int, zone: Zone + ) -> None: """Init the zone.""" - super().__init__(**kwargs) - self._zone_id = zone_id - self._zone = zone - self._attr_has_entity_name = True - self._attr_name = None - - @property - def extra_state_attributes(self) -> Mapping[str, Any] | None: - """Return the state attributes.""" - return {"zone_id": self._zone_id, "bypassed": self._zone.bypassed} + super().__init__( + coordinator=coordinator, name=None, suffix="", zone_id=zone_id, zone=zone + ) @property def is_on(self) -> bool | None: """Return true if sensor is on.""" return self._zone.triggered - async def async_bypass_zone(self) -> None: - """Bypass this zone.""" - await self._bypass(True) - async def async_unbypass_zone(self) -> None: - """Unbypass this zone.""" - await self._bypass(False) - - async def _bypass(self, bypass: bool) -> None: - raise NotImplementedError - - -class RiscoCloudBinarySensor(RiscoBinarySensor, RiscoEntity): - """Representation of a Risco cloud zone as a binary sensor.""" - - def __init__( - self, coordinator: RiscoDataUpdateCoordinator, zone_id: int, zone: Zone - ) -> None: - """Init the zone.""" - super().__init__(zone_id=zone_id, zone=zone, coordinator=coordinator) - self._attr_unique_id = binary_sensor_unique_id(self._risco, zone_id) - self._attr_device_info = DeviceInfo( - identifiers={(DOMAIN, self._attr_unique_id)}, - manufacturer="Risco", - name=self._zone.name, - ) - - def _get_data_from_coordinator(self) -> None: - self._zone = self.coordinator.data.zones[self._zone_id] - - async def _bypass(self, bypass: bool) -> None: - alarm = await self._risco.bypass_zone(self._zone_id, bypass) - self._zone = alarm.zones[self._zone_id] - self.async_write_ha_state() - - -class RiscoLocalBinarySensor(RiscoBinarySensor): +class RiscoLocalBinarySensor(RiscoLocalZoneEntity, BinarySensorEntity): """Representation of a Risco local zone as a binary sensor.""" - _attr_should_poll = False + _attr_device_class = BinarySensorDeviceClass.MOTION def __init__(self, system_id: str, zone_id: int, zone: Zone) -> None: """Init the zone.""" - super().__init__(zone_id=zone_id, zone=zone) - self._attr_unique_id = _unique_id_for_local(system_id, zone_id) - self._attr_device_info = DeviceInfo( - identifiers={(DOMAIN, self._attr_unique_id)}, - manufacturer="Risco", - name=self._zone.name, - ) - - async def async_added_to_hass(self) -> None: - """Subscribe to updates.""" - signal = zone_update_signal(self._zone_id) - self.async_on_remove( - async_dispatcher_connect(self.hass, signal, self.async_write_ha_state) + super().__init__( + system_id=system_id, name=None, suffix="", zone_id=zone_id, zone=zone ) @property @@ -150,42 +83,27 @@ class RiscoLocalBinarySensor(RiscoBinarySensor): "groups": self._zone.groups, } - async def _bypass(self, bypass: bool) -> None: - await self._zone.bypass(bypass) + @property + def is_on(self) -> bool | None: + """Return true if sensor is on.""" + return self._zone.triggered -class RiscoLocalAlarmedBinarySensor(BinarySensorEntity): +class RiscoLocalAlarmedBinarySensor(RiscoLocalZoneEntity, BinarySensorEntity): """Representation whether a zone in Risco local is currently triggering an alarm.""" _attr_should_poll = False def __init__(self, system_id: str, zone_id: int, zone: Zone) -> None: """Init the zone.""" - super().__init__() - self._zone_id = zone_id - self._zone = zone - self._attr_has_entity_name = True - self._attr_name = "Alarmed" - device_unique_id = _unique_id_for_local(system_id, zone_id) - self._attr_unique_id = device_unique_id + "_alarmed" - self._attr_device_info = DeviceInfo( - identifiers={(DOMAIN, device_unique_id)}, - manufacturer="Risco", - name=self._zone.name, + super().__init__( + system_id=system_id, + name="Alarmed", + suffix="_alarmed", + zone_id=zone_id, + zone=zone, ) - async def async_added_to_hass(self) -> None: - """Subscribe to updates.""" - signal = zone_update_signal(self._zone_id) - self.async_on_remove( - async_dispatcher_connect(self.hass, signal, self.async_write_ha_state) - ) - - @property - def extra_state_attributes(self) -> Mapping[str, Any] | None: - """Return the state attributes.""" - return {"zone_id": self._zone_id} - @property def is_on(self) -> bool | None: """Return true if sensor is on.""" diff --git a/homeassistant/components/risco/entity.py b/homeassistant/components/risco/entity.py index e49b632ac78..a4ac260887c 100644 --- a/homeassistant/components/risco/entity.py +++ b/homeassistant/components/risco/entity.py @@ -1,25 +1,40 @@ """A risco entity base class.""" +from __future__ import annotations + +from typing import Any + +from pyrisco.common import Zone + +from homeassistant.helpers.dispatcher import async_dispatcher_connect +from homeassistant.helpers.entity import DeviceInfo, Entity from homeassistant.helpers.update_coordinator import CoordinatorEntity -from . import RiscoDataUpdateCoordinator +from . import RiscoDataUpdateCoordinator, zone_update_signal +from .const import DOMAIN -def binary_sensor_unique_id(risco, zone_id: int) -> str: - """Return unique id for the binary sensor.""" +def zone_unique_id(risco, zone_id: int) -> str: + """Return unique id for a cloud zone.""" return f"{risco.site_uuid}_zone_{zone_id}" -class RiscoEntity(CoordinatorEntity[RiscoDataUpdateCoordinator]): - """Risco entity base class.""" +class RiscoCloudEntity(CoordinatorEntity[RiscoDataUpdateCoordinator]): + """Risco cloud entity base class.""" - def _get_data_from_coordinator(self): + def __init__( + self, *, coordinator: RiscoDataUpdateCoordinator, **kwargs: Any + ) -> None: + """Init the entity.""" + super().__init__(coordinator=coordinator, **kwargs) + + def _get_data_from_coordinator(self) -> None: raise NotImplementedError - def _refresh_from_coordinator(self): + def _refresh_from_coordinator(self) -> None: self._get_data_from_coordinator() self.async_write_ha_state() - async def async_added_to_hass(self): + async def async_added_to_hass(self) -> None: """When entity is added to hass.""" self.async_on_remove( self.coordinator.async_add_listener(self._refresh_from_coordinator) @@ -29,3 +44,74 @@ class RiscoEntity(CoordinatorEntity[RiscoDataUpdateCoordinator]): def _risco(self): """Return the Risco API object.""" return self.coordinator.risco + + +class RiscoCloudZoneEntity(RiscoCloudEntity): + """Risco cloud zone entity base class.""" + + _attr_has_entity_name = True + + def __init__( + self, + *, + coordinator: RiscoDataUpdateCoordinator, + name: str | None, + suffix: str, + zone_id: int, + zone: Zone, + **kwargs: Any, + ) -> None: + """Init the zone.""" + super().__init__(coordinator=coordinator, **kwargs) + self._zone_id = zone_id + self._zone = zone + self._attr_name = name + device_unique_id = zone_unique_id(self._risco, zone_id) + self._attr_unique_id = f"{device_unique_id}{suffix}" + self._attr_device_info = DeviceInfo( + identifiers={(DOMAIN, device_unique_id)}, + manufacturer="Risco", + name=self._zone.name, + ) + self._attr_extra_state_attributes = {"zone_id": zone_id} + + def _get_data_from_coordinator(self) -> None: + self._zone = self.coordinator.data.zones[self._zone_id] + + +class RiscoLocalZoneEntity(Entity): + """Risco local zone entity base class.""" + + _attr_should_poll = False + _attr_has_entity_name = True + + def __init__( + self, + *, + system_id: str, + name: str | None, + suffix: str, + zone_id: int, + zone: Zone, + **kwargs: Any, + ) -> None: + """Init the zone.""" + super().__init__(**kwargs) + self._zone_id = zone_id + self._zone = zone + self._attr_name = name + device_unique_id = f"{system_id}_zone_{zone_id}_local" + self._attr_unique_id = f"{device_unique_id}{suffix}" + self._attr_device_info = DeviceInfo( + identifiers={(DOMAIN, device_unique_id)}, + manufacturer="Risco", + name=zone.name, + ) + self._attr_extra_state_attributes = {"zone_id": zone_id} + + async def async_added_to_hass(self) -> None: + """Subscribe to updates.""" + signal = zone_update_signal(self._zone_id) + self.async_on_remove( + async_dispatcher_connect(self.hass, signal, self.async_write_ha_state) + ) diff --git a/homeassistant/components/risco/sensor.py b/homeassistant/components/risco/sensor.py index c4bd047e260..f2cb9821166 100644 --- a/homeassistant/components/risco/sensor.py +++ b/homeassistant/components/risco/sensor.py @@ -15,7 +15,7 @@ from homeassistant.util import dt as dt_util from . import RiscoEventsDataUpdateCoordinator, is_local from .const import DOMAIN, EVENTS_COORDINATOR -from .entity import binary_sensor_unique_id +from .entity import zone_unique_id CATEGORIES = { 2: "Alarm", @@ -115,11 +115,9 @@ class RiscoSensor(CoordinatorEntity, SensorEntity): attrs = {atr: getattr(self._event, atr, None) for atr in EVENT_ATTRIBUTES} if self._event.zone_id is not None: - zone_unique_id = binary_sensor_unique_id( - self.coordinator.risco, self._event.zone_id - ) + uid = zone_unique_id(self.coordinator.risco, self._event.zone_id) zone_entity_id = self._entity_registry.async_get_entity_id( - BS_DOMAIN, DOMAIN, zone_unique_id + BS_DOMAIN, DOMAIN, uid ) if zone_entity_id is not None: attrs["zone_entity_id"] = zone_entity_id diff --git a/homeassistant/components/risco/services.yaml b/homeassistant/components/risco/services.yaml deleted file mode 100644 index c271df7b462..00000000000 --- a/homeassistant/components/risco/services.yaml +++ /dev/null @@ -1,17 +0,0 @@ -# Describes the format for available Risco services - -bypass_zone: - name: Bypass zone - description: Bypass a Risco Zone - target: - entity: - integration: risco - domain: binary_sensor - -unbypass_zone: - name: Unbypass zone - description: Unbypass a Risco Zone - target: - entity: - integration: risco - domain: binary_sensor diff --git a/homeassistant/components/risco/switch.py b/homeassistant/components/risco/switch.py new file mode 100644 index 00000000000..2ed07b9f34b --- /dev/null +++ b/homeassistant/components/risco/switch.py @@ -0,0 +1,104 @@ +"""Support for bypassing Risco alarm zones.""" +from __future__ import annotations + +from pyrisco.common import Zone + +from homeassistant.components.switch import SwitchEntity +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity import EntityCategory +from homeassistant.helpers.entity_platform import AddEntitiesCallback + +from . import LocalData, RiscoDataUpdateCoordinator, is_local +from .const import DATA_COORDINATOR, DOMAIN +from .entity import RiscoCloudZoneEntity, RiscoLocalZoneEntity + + +async def async_setup_entry( + hass: HomeAssistant, + config_entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + """Set up the Risco switch.""" + if is_local(config_entry): + local_data: LocalData = hass.data[DOMAIN][config_entry.entry_id] + async_add_entities( + RiscoLocalSwitch(local_data.system.id, zone_id, zone) + for zone_id, zone in local_data.system.zones.items() + ) + else: + coordinator: RiscoDataUpdateCoordinator = hass.data[DOMAIN][ + config_entry.entry_id + ][DATA_COORDINATOR] + async_add_entities( + RiscoCloudSwitch(coordinator, zone_id, zone) + for zone_id, zone in coordinator.data.zones.items() + ) + + +class RiscoCloudSwitch(RiscoCloudZoneEntity, SwitchEntity): + """Representation of a bypass switch for a Risco cloud zone.""" + + _attr_entity_category = EntityCategory.CONFIG + + def __init__( + self, coordinator: RiscoDataUpdateCoordinator, zone_id: int, zone: Zone + ) -> None: + """Init the zone.""" + super().__init__( + coordinator=coordinator, + name="Bypassed", + suffix="_bypassed", + zone_id=zone_id, + zone=zone, + ) + + @property + def is_on(self) -> bool | None: + """Return true if the zone is bypassed.""" + return self._zone.bypassed + + async def async_turn_on(self, **kwargs): + """Turn the entity on.""" + await self._bypass(True) + + async def async_turn_off(self, **kwargs): + """Turn the entity off.""" + await self._bypass(False) + + async def _bypass(self, bypass: bool) -> None: + alarm = await self._risco.bypass_zone(self._zone_id, bypass) + self._zone = alarm.zones[self._zone_id] + self.async_write_ha_state() + + +class RiscoLocalSwitch(RiscoLocalZoneEntity, SwitchEntity): + """Representation of a bypass switch for a Risco local zone.""" + + _attr_entity_category = EntityCategory.CONFIG + + def __init__(self, system_id: str, zone_id: int, zone: Zone) -> None: + """Init the zone.""" + super().__init__( + system_id=system_id, + name="Bypassed", + suffix="_bypassed", + zone_id=zone_id, + zone=zone, + ) + + @property + def is_on(self) -> bool | None: + """Return true if the zone is bypassed.""" + return self._zone.bypassed + + async def async_turn_on(self, **kwargs): + """Turn the entity on.""" + await self._bypass(True) + + async def async_turn_off(self, **kwargs): + """Turn the entity off.""" + await self._bypass(False) + + async def _bypass(self, bypass: bool) -> None: + await self._zone.bypass(bypass) diff --git a/tests/components/risco/conftest.py b/tests/components/risco/conftest.py index b22768a1cd0..cc65efd9b55 100644 --- a/tests/components/risco/conftest.py +++ b/tests/components/risco/conftest.py @@ -39,10 +39,14 @@ def two_zone_cloud(): zone_mocks[0], "id", new_callable=PropertyMock(return_value=0) ), patch.object( zone_mocks[0], "name", new_callable=PropertyMock(return_value="Zone 0") + ), patch.object( + zone_mocks[0], "bypassed", new_callable=PropertyMock(return_value=False) ), patch.object( zone_mocks[1], "id", new_callable=PropertyMock(return_value=1) ), patch.object( zone_mocks[1], "name", new_callable=PropertyMock(return_value="Zone 1") + ), patch.object( + zone_mocks[1], "bypassed", new_callable=PropertyMock(return_value=False) ), patch.object( alarm_mock, "zones", @@ -54,6 +58,36 @@ def two_zone_cloud(): yield zone_mocks +@fixture +def two_zone_local(): + """Fixture to mock alarm with two zones.""" + zone_mocks = {0: zone_mock(), 1: zone_mock()} + with patch.object( + zone_mocks[0], "id", new_callable=PropertyMock(return_value=0) + ), patch.object( + zone_mocks[0], "name", new_callable=PropertyMock(return_value="Zone 0") + ), patch.object( + zone_mocks[0], "alarmed", new_callable=PropertyMock(return_value=False) + ), patch.object( + zone_mocks[0], "bypassed", new_callable=PropertyMock(return_value=False) + ), patch.object( + zone_mocks[1], "id", new_callable=PropertyMock(return_value=1) + ), patch.object( + zone_mocks[1], "name", new_callable=PropertyMock(return_value="Zone 1") + ), patch.object( + zone_mocks[1], "alarmed", new_callable=PropertyMock(return_value=False) + ), patch.object( + zone_mocks[1], "bypassed", new_callable=PropertyMock(return_value=False) + ), patch( + "homeassistant.components.risco.RiscoLocal.partitions", + new_callable=PropertyMock(return_value={}), + ), patch( + "homeassistant.components.risco.RiscoLocal.zones", + new_callable=PropertyMock(return_value=zone_mocks), + ): + yield zone_mocks + + @fixture def options(): """Fixture for default (empty) options.""" diff --git a/tests/components/risco/test_binary_sensor.py b/tests/components/risco/test_binary_sensor.py index 71cbd04f391..00d10f6059e 100644 --- a/tests/components/risco/test_binary_sensor.py +++ b/tests/components/risco/test_binary_sensor.py @@ -9,7 +9,7 @@ from homeassistant.const import STATE_OFF, STATE_ON from homeassistant.helpers import device_registry as dr, entity_registry as er from homeassistant.helpers.entity_component import async_update_entity -from .util import TEST_SITE_UUID, zone_mock +from .util import TEST_SITE_UUID FIRST_ENTITY_ID = "binary_sensor.zone_0" SECOND_ENTITY_ID = "binary_sensor.zone_1" @@ -17,32 +17,6 @@ FIRST_ALARMED_ENTITY_ID = FIRST_ENTITY_ID + "_alarmed" SECOND_ALARMED_ENTITY_ID = SECOND_ENTITY_ID + "_alarmed" -@pytest.fixture -def two_zone_local(): - """Fixture to mock alarm with two zones.""" - zone_mocks = {0: zone_mock(), 1: zone_mock()} - with patch.object( - zone_mocks[0], "id", new_callable=PropertyMock(return_value=0) - ), patch.object( - zone_mocks[0], "name", new_callable=PropertyMock(return_value="Zone 0") - ), patch.object( - zone_mocks[0], "alarmed", new_callable=PropertyMock(return_value=False) - ), patch.object( - zone_mocks[1], "id", new_callable=PropertyMock(return_value=1) - ), patch.object( - zone_mocks[1], "name", new_callable=PropertyMock(return_value="Zone 1") - ), patch.object( - zone_mocks[1], "alarmed", new_callable=PropertyMock(return_value=False) - ), patch( - "homeassistant.components.risco.RiscoLocal.partitions", - new_callable=PropertyMock(return_value={}), - ), patch( - "homeassistant.components.risco.RiscoLocal.zones", - new_callable=PropertyMock(return_value=zone_mocks), - ): - yield zone_mocks - - @pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError]) async def test_error_on_login(hass, login_with_error, cloud_config_entry): """Test error on login.""" @@ -69,59 +43,26 @@ async def test_cloud_setup(hass, two_zone_cloud, setup_risco_cloud): assert device.manufacturer == "Risco" -async def _check_cloud_state(hass, zones, triggered, bypassed, entity_id, zone_id): +async def _check_cloud_state(hass, zones, triggered, entity_id, zone_id): with patch.object( zones[zone_id], "triggered", new_callable=PropertyMock(return_value=triggered), - ), patch.object( - zones[zone_id], - "bypassed", - new_callable=PropertyMock(return_value=bypassed), ): await async_update_entity(hass, entity_id) await hass.async_block_till_done() expected_triggered = STATE_ON if triggered else STATE_OFF assert hass.states.get(entity_id).state == expected_triggered - assert hass.states.get(entity_id).attributes["bypassed"] == bypassed assert hass.states.get(entity_id).attributes["zone_id"] == zone_id async def test_cloud_states(hass, two_zone_cloud, setup_risco_cloud): """Test the various alarm states.""" - await _check_cloud_state(hass, two_zone_cloud, True, True, FIRST_ENTITY_ID, 0) - await _check_cloud_state(hass, two_zone_cloud, True, False, FIRST_ENTITY_ID, 0) - await _check_cloud_state(hass, two_zone_cloud, False, True, FIRST_ENTITY_ID, 0) - await _check_cloud_state(hass, two_zone_cloud, False, False, FIRST_ENTITY_ID, 0) - await _check_cloud_state(hass, two_zone_cloud, True, True, SECOND_ENTITY_ID, 1) - await _check_cloud_state(hass, two_zone_cloud, True, False, SECOND_ENTITY_ID, 1) - await _check_cloud_state(hass, two_zone_cloud, False, True, SECOND_ENTITY_ID, 1) - await _check_cloud_state(hass, two_zone_cloud, False, False, SECOND_ENTITY_ID, 1) - - -async def test_cloud_bypass(hass, two_zone_cloud, setup_risco_cloud): - """Test bypassing a zone.""" - with patch("homeassistant.components.risco.RiscoCloud.bypass_zone") as mock: - data = {"entity_id": FIRST_ENTITY_ID} - - await hass.services.async_call( - DOMAIN, "bypass_zone", service_data=data, blocking=True - ) - - mock.assert_awaited_once_with(0, True) - - -async def test_cloud_unbypass(hass, two_zone_cloud, setup_risco_cloud): - """Test unbypassing a zone.""" - with patch("homeassistant.components.risco.RiscoCloud.bypass_zone") as mock: - data = {"entity_id": FIRST_ENTITY_ID} - - await hass.services.async_call( - DOMAIN, "unbypass_zone", service_data=data, blocking=True - ) - - mock.assert_awaited_once_with(0, False) + await _check_cloud_state(hass, two_zone_cloud, True, FIRST_ENTITY_ID, 0) + await _check_cloud_state(hass, two_zone_cloud, False, FIRST_ENTITY_ID, 0) + await _check_cloud_state(hass, two_zone_cloud, True, SECOND_ENTITY_ID, 1) + await _check_cloud_state(hass, two_zone_cloud, False, SECOND_ENTITY_ID, 1) @pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError]) @@ -154,24 +95,17 @@ async def test_local_setup(hass, two_zone_local, setup_risco_local): assert device.manufacturer == "Risco" -async def _check_local_state( - hass, zones, triggered, bypassed, entity_id, zone_id, callback -): +async def _check_local_state(hass, zones, triggered, entity_id, zone_id, callback): with patch.object( zones[zone_id], "triggered", new_callable=PropertyMock(return_value=triggered), - ), patch.object( - zones[zone_id], - "bypassed", - new_callable=PropertyMock(return_value=bypassed), ): await callback(zone_id, zones[zone_id]) await hass.async_block_till_done() expected_triggered = STATE_ON if triggered else STATE_OFF assert hass.states.get(entity_id).state == expected_triggered - assert hass.states.get(entity_id).attributes["bypassed"] == bypassed assert hass.states.get(entity_id).attributes["zone_id"] == zone_id @@ -205,30 +139,10 @@ async def test_local_states( assert callback is not None - await _check_local_state( - hass, two_zone_local, True, True, FIRST_ENTITY_ID, 0, callback - ) - await _check_local_state( - hass, two_zone_local, True, False, FIRST_ENTITY_ID, 0, callback - ) - await _check_local_state( - hass, two_zone_local, False, True, FIRST_ENTITY_ID, 0, callback - ) - await _check_local_state( - hass, two_zone_local, False, False, FIRST_ENTITY_ID, 0, callback - ) - await _check_local_state( - hass, two_zone_local, True, True, SECOND_ENTITY_ID, 1, callback - ) - await _check_local_state( - hass, two_zone_local, True, False, SECOND_ENTITY_ID, 1, callback - ) - await _check_local_state( - hass, two_zone_local, False, True, SECOND_ENTITY_ID, 1, callback - ) - await _check_local_state( - hass, two_zone_local, False, False, SECOND_ENTITY_ID, 1, callback - ) + await _check_local_state(hass, two_zone_local, True, FIRST_ENTITY_ID, 0, callback) + await _check_local_state(hass, two_zone_local, False, FIRST_ENTITY_ID, 0, callback) + await _check_local_state(hass, two_zone_local, True, SECOND_ENTITY_ID, 1, callback) + await _check_local_state(hass, two_zone_local, False, SECOND_ENTITY_ID, 1, callback) async def test_alarmed_local_states( @@ -251,27 +165,3 @@ async def test_alarmed_local_states( await _check_alarmed_local_state( hass, two_zone_local, False, SECOND_ALARMED_ENTITY_ID, 1, callback ) - - -async def test_local_bypass(hass, two_zone_local, setup_risco_local): - """Test bypassing a zone.""" - with patch.object(two_zone_local[0], "bypass") as mock: - data = {"entity_id": FIRST_ENTITY_ID} - - await hass.services.async_call( - DOMAIN, "bypass_zone", service_data=data, blocking=True - ) - - mock.assert_awaited_once_with(True) - - -async def test_local_unbypass(hass, two_zone_local, setup_risco_local): - """Test unbypassing a zone.""" - with patch.object(two_zone_local[0], "bypass") as mock: - data = {"entity_id": FIRST_ENTITY_ID} - - await hass.services.async_call( - DOMAIN, "unbypass_zone", service_data=data, blocking=True - ) - - mock.assert_awaited_once_with(False) diff --git a/tests/components/risco/test_switch.py b/tests/components/risco/test_switch.py new file mode 100644 index 00000000000..5ea4e72abca --- /dev/null +++ b/tests/components/risco/test_switch.py @@ -0,0 +1,151 @@ +"""Tests for the Risco binary sensors.""" +from unittest.mock import PropertyMock, patch + +import pytest + +from homeassistant.components.risco import CannotConnectError, UnauthorizedError +from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN +from homeassistant.const import SERVICE_TURN_OFF, SERVICE_TURN_ON, STATE_OFF, STATE_ON +from homeassistant.helpers import entity_registry as er +from homeassistant.helpers.entity_component import async_update_entity + +FIRST_ENTITY_ID = "switch.zone_0_bypassed" +SECOND_ENTITY_ID = "switch.zone_1_bypassed" + + +@pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError]) +async def test_error_on_login(hass, login_with_error, cloud_config_entry): + """Test error on login.""" + await hass.config_entries.async_setup(cloud_config_entry.entry_id) + await hass.async_block_till_done() + registry = er.async_get(hass) + assert not registry.async_is_registered(FIRST_ENTITY_ID) + assert not registry.async_is_registered(SECOND_ENTITY_ID) + + +async def test_cloud_setup(hass, two_zone_cloud, setup_risco_cloud): + """Test entity setup.""" + registry = er.async_get(hass) + assert registry.async_is_registered(FIRST_ENTITY_ID) + assert registry.async_is_registered(SECOND_ENTITY_ID) + + +async def _check_cloud_state(hass, zones, bypassed, entity_id, zone_id): + with patch.object( + zones[zone_id], + "bypassed", + new_callable=PropertyMock(return_value=bypassed), + ): + await async_update_entity(hass, entity_id) + await hass.async_block_till_done() + + expected_bypassed = STATE_ON if bypassed else STATE_OFF + assert hass.states.get(entity_id).state == expected_bypassed + assert hass.states.get(entity_id).attributes["zone_id"] == zone_id + + +async def test_cloud_states(hass, two_zone_cloud, setup_risco_cloud): + """Test the various alarm states.""" + await _check_cloud_state(hass, two_zone_cloud, True, FIRST_ENTITY_ID, 0) + await _check_cloud_state(hass, two_zone_cloud, False, FIRST_ENTITY_ID, 0) + await _check_cloud_state(hass, two_zone_cloud, True, SECOND_ENTITY_ID, 1) + await _check_cloud_state(hass, two_zone_cloud, False, SECOND_ENTITY_ID, 1) + + +async def test_cloud_bypass(hass, two_zone_cloud, setup_risco_cloud): + """Test bypassing a zone.""" + with patch("homeassistant.components.risco.RiscoCloud.bypass_zone") as mock: + data = {"entity_id": FIRST_ENTITY_ID} + + await hass.services.async_call( + SWITCH_DOMAIN, SERVICE_TURN_ON, service_data=data, blocking=True + ) + + mock.assert_awaited_once_with(0, True) + + +async def test_cloud_unbypass(hass, two_zone_cloud, setup_risco_cloud): + """Test unbypassing a zone.""" + with patch("homeassistant.components.risco.RiscoCloud.bypass_zone") as mock: + data = {"entity_id": FIRST_ENTITY_ID} + + await hass.services.async_call( + SWITCH_DOMAIN, SERVICE_TURN_OFF, service_data=data, blocking=True + ) + + mock.assert_awaited_once_with(0, False) + + +@pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError]) +async def test_error_on_connect(hass, connect_with_error, local_config_entry): + """Test error on connect.""" + await hass.config_entries.async_setup(local_config_entry.entry_id) + await hass.async_block_till_done() + registry = er.async_get(hass) + assert not registry.async_is_registered(FIRST_ENTITY_ID) + assert not registry.async_is_registered(SECOND_ENTITY_ID) + + +async def test_local_setup(hass, two_zone_local, setup_risco_local): + """Test entity setup.""" + registry = er.async_get(hass) + assert registry.async_is_registered(FIRST_ENTITY_ID) + assert registry.async_is_registered(SECOND_ENTITY_ID) + + +async def _check_local_state(hass, zones, bypassed, entity_id, zone_id, callback): + with patch.object( + zones[zone_id], + "bypassed", + new_callable=PropertyMock(return_value=bypassed), + ): + await callback(zone_id, zones[zone_id]) + await hass.async_block_till_done() + + expected_bypassed = STATE_ON if bypassed else STATE_OFF + assert hass.states.get(entity_id).state == expected_bypassed + assert hass.states.get(entity_id).attributes["zone_id"] == zone_id + + +@pytest.fixture +def _mock_zone_handler(): + with patch("homeassistant.components.risco.RiscoLocal.add_zone_handler") as mock: + yield mock + + +async def test_local_states( + hass, two_zone_local, _mock_zone_handler, setup_risco_local +): + """Test the various alarm states.""" + callback = _mock_zone_handler.call_args.args[0] + + assert callback is not None + + await _check_local_state(hass, two_zone_local, True, FIRST_ENTITY_ID, 0, callback) + await _check_local_state(hass, two_zone_local, False, FIRST_ENTITY_ID, 0, callback) + await _check_local_state(hass, two_zone_local, True, SECOND_ENTITY_ID, 1, callback) + await _check_local_state(hass, two_zone_local, False, SECOND_ENTITY_ID, 1, callback) + + +async def test_local_bypass(hass, two_zone_local, setup_risco_local): + """Test bypassing a zone.""" + with patch.object(two_zone_local[0], "bypass") as mock: + data = {"entity_id": FIRST_ENTITY_ID} + + await hass.services.async_call( + SWITCH_DOMAIN, SERVICE_TURN_ON, service_data=data, blocking=True + ) + + mock.assert_awaited_once_with(True) + + +async def test_local_unbypass(hass, two_zone_local, setup_risco_local): + """Test unbypassing a zone.""" + with patch.object(two_zone_local[0], "bypass") as mock: + data = {"entity_id": FIRST_ENTITY_ID} + + await hass.services.async_call( + SWITCH_DOMAIN, SERVICE_TURN_OFF, service_data=data, blocking=True + ) + + mock.assert_awaited_once_with(False)