Support for local push in Risco integration (#75874)

* Local config flow

* Local entities

* Apply suggestions from code review

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Address code review comments

* More type hints

* Apply suggestions from code review

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* More annotations

* Even more annonations

* New entity naming

* Move fixtures to conftest

* Improve state tests for local

* Remove mutable default arguments

* Remove assertions for lack of state

* Add missing file

* Switch setup to fixtures

* Use error fixtures in test_config_flow

* Apply suggestions from code review

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
On Freund 2022-08-24 14:09:54 +03:00 committed by GitHub
parent 2497ff5a39
commit 635eda584d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1596 additions and 480 deletions

View File

@ -1,14 +1,27 @@
"""The Risco integration."""
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import timedelta
import logging
from typing import Any
from pyrisco import CannotConnectError, OperationError, RiscoCloud, UnauthorizedError
from pyrisco import (
CannotConnectError,
OperationError,
RiscoCloud,
RiscoLocal,
UnauthorizedError,
)
from pyrisco.common import Partition, Zone
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_PIN,
CONF_PORT,
CONF_SCAN_INTERVAL,
CONF_TYPE,
CONF_USERNAME,
Platform,
)
@ -18,17 +31,94 @@ from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.storage import Store
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DATA_COORDINATOR, DEFAULT_SCAN_INTERVAL, DOMAIN, EVENTS_COORDINATOR
from .const import (
DATA_COORDINATOR,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
EVENTS_COORDINATOR,
TYPE_LOCAL,
)
PLATFORMS = [Platform.ALARM_CONTROL_PANEL, Platform.BINARY_SENSOR, Platform.SENSOR]
UNDO_UPDATE_LISTENER = "undo_update_listener"
LAST_EVENT_STORAGE_VERSION = 1
LAST_EVENT_TIMESTAMP_KEY = "last_event_timestamp"
_LOGGER = logging.getLogger(__name__)
@dataclass
class LocalData:
"""A data class for local data passed to the platforms."""
system: RiscoLocal
zone_updates: dict[int, Callable[[], Any]] = field(default_factory=dict)
partition_updates: dict[int, Callable[[], Any]] = field(default_factory=dict)
def is_local(entry: ConfigEntry) -> bool:
"""Return whether the entry represents an instance with local communication."""
return entry.data.get(CONF_TYPE) == TYPE_LOCAL
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Risco from a config entry."""
if is_local(entry):
return await _async_setup_local_entry(hass, entry)
return await _async_setup_cloud_entry(hass, entry)
async def _async_setup_local_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
data = entry.data
risco = RiscoLocal(data[CONF_HOST], data[CONF_PORT], data[CONF_PIN])
try:
await risco.connect()
except CannotConnectError as error:
raise ConfigEntryNotReady() from error
except UnauthorizedError:
_LOGGER.exception("Failed to login to Risco cloud")
return False
async def _error(error: Exception) -> None:
_LOGGER.error("Error in Risco library: %s", error)
entry.async_on_unload(risco.add_error_handler(_error))
async def _default(command: str, result: str, *params: list[str]) -> None:
_LOGGER.debug(
"Unhandled update from Risco library: %s, %s, %s", command, result, params
)
entry.async_on_unload(risco.add_default_handler(_default))
local_data = LocalData(risco)
async def _zone(zone_id: int, zone: Zone) -> None:
_LOGGER.debug("Risco zone update for %d", zone_id)
callback = local_data.zone_updates.get(zone_id)
if callback:
callback()
entry.async_on_unload(risco.add_zone_handler(_zone))
async def _partition(partition_id: int, partition: Partition) -> None:
_LOGGER.debug("Risco partition update for %d", partition_id)
callback = local_data.partition_updates.get(partition_id)
if callback:
callback()
entry.async_on_unload(risco.add_partition_handler(_partition))
entry.async_on_unload(entry.add_update_listener(_update_listener))
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = local_data
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def _async_setup_cloud_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
data = entry.data
risco = RiscoCloud(data[CONF_USERNAME], data[CONF_PASSWORD], data[CONF_PIN])
try:
@ -46,12 +136,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass, risco, entry.entry_id, 60
)
undo_listener = entry.add_update_listener(_update_listener)
entry.async_on_unload(entry.add_update_listener(_update_listener))
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = {
DATA_COORDINATOR: coordinator,
UNDO_UPDATE_LISTENER: undo_listener,
EVENTS_COORDINATOR: events_coordinator,
}
@ -65,7 +154,6 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN][entry.entry_id][UNDO_UPDATE_LISTENER]()
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok

View File

@ -1,7 +1,11 @@
"""Support for Risco alarms."""
from __future__ import annotations
from collections.abc import Callable
import logging
from typing import Any
from pyrisco.common import Partition
from homeassistant.components.alarm_control_panel import (
AlarmControlPanelEntity,
@ -23,6 +27,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import LocalData, RiscoDataUpdateCoordinator, is_local
from .const import (
CONF_CODE_ARM_REQUIRED,
CONF_CODE_DISARM_REQUIRED,
@ -53,57 +58,61 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the Risco alarm control panel."""
coordinator = hass.data[DOMAIN][config_entry.entry_id][DATA_COORDINATOR]
options = {**DEFAULT_OPTIONS, **config_entry.options}
entities = [
RiscoAlarm(coordinator, partition_id, config_entry.data[CONF_PIN], options)
for partition_id in coordinator.data.partitions
]
async_add_entities(entities, False)
if is_local(config_entry):
local_data: LocalData = hass.data[DOMAIN][config_entry.entry_id]
async_add_entities(
RiscoLocalAlarm(
local_data.system.id,
partition_id,
partition,
local_data.partition_updates,
config_entry.data[CONF_PIN],
options,
)
for partition_id, partition in local_data.system.partitions.items()
)
else:
coordinator: RiscoDataUpdateCoordinator = hass.data[DOMAIN][
config_entry.entry_id
][DATA_COORDINATOR]
async_add_entities(
RiscoCloudAlarm(
coordinator, partition_id, config_entry.data[CONF_PIN], options
)
for partition_id in coordinator.data.partitions
)
class RiscoAlarm(AlarmControlPanelEntity, RiscoEntity):
"""Representation of a Risco partition."""
class RiscoAlarm(AlarmControlPanelEntity):
"""Representation of a Risco cloud partition."""
_attr_code_format = CodeFormat.NUMBER
def __init__(self, coordinator, partition_id, code, options):
def __init__(
self,
*,
partition_id: int,
partition: Partition,
code: str,
options: dict[str, Any],
**kwargs: Any,
) -> None:
"""Init the partition."""
super().__init__(coordinator)
super().__init__(**kwargs)
self._partition_id = partition_id
self._partition = self.coordinator.data.partitions[self._partition_id]
self._partition = partition
self._code = code
self._attr_code_arm_required = options[CONF_CODE_ARM_REQUIRED]
self._code_disarm_required = options[CONF_CODE_DISARM_REQUIRED]
self._risco_to_ha = options[CONF_RISCO_STATES_TO_HA]
self._ha_to_risco = options[CONF_HA_STATES_TO_RISCO]
self._attr_supported_features = 0
self._attr_has_entity_name = True
self._attr_name = None
for state in self._ha_to_risco:
self._attr_supported_features |= STATES_TO_SUPPORTED_FEATURES[state]
def _get_data_from_coordinator(self):
self._partition = self.coordinator.data.partitions[self._partition_id]
@property
def device_info(self) -> DeviceInfo:
"""Return device info for this device."""
return DeviceInfo(
identifiers={(DOMAIN, self.unique_id)},
name=self.name,
manufacturer="Risco",
)
@property
def name(self) -> str:
"""Return the name of the partition."""
return f"Risco {self._risco.site_name} Partition {self._partition_id}"
@property
def unique_id(self) -> str:
"""Return a unique id for that partition."""
return f"{self._risco.site_uuid}_{self._partition_id}"
@property
def state(self) -> str | None:
"""Return the state of the device."""
@ -165,7 +174,74 @@ class RiscoAlarm(AlarmControlPanelEntity, RiscoEntity):
else:
await self._call_alarm_method(risco_state)
async def _call_alarm_method(self, method: str, *args: Any) -> None:
raise NotImplementedError
class RiscoCloudAlarm(RiscoAlarm, RiscoEntity):
"""Representation of a Risco partition."""
def __init__(
self,
coordinator: RiscoDataUpdateCoordinator,
partition_id: int,
code: str,
options: dict[str, Any],
) -> None:
"""Init the partition."""
super().__init__(
partition_id=partition_id,
partition=coordinator.data.partitions[partition_id],
coordinator=coordinator,
code=code,
options=options,
)
self._attr_unique_id = f"{self._risco.site_uuid}_{partition_id}"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, self._attr_unique_id)},
name=f"Risco {self._risco.site_name} Partition {partition_id}",
manufacturer="Risco",
)
def _get_data_from_coordinator(self) -> None:
self._partition = self.coordinator.data.partitions[self._partition_id]
async def _call_alarm_method(self, method, *args):
alarm = await getattr(self._risco, method)(self._partition_id, *args)
self._partition = alarm.partitions[self._partition_id]
self.async_write_ha_state()
class RiscoLocalAlarm(RiscoAlarm):
"""Representation of a Risco local, partition."""
_attr_should_poll = False
def __init__(
self,
system_id: str,
partition_id: int,
partition: Partition,
partition_updates: dict[int, Callable[[], Any]],
code: str,
options: dict[str, Any],
) -> None:
"""Init the partition."""
super().__init__(
partition_id=partition_id, partition=partition, code=code, options=options
)
self._system_id = system_id
self._partition_updates = partition_updates
self._attr_unique_id = f"{system_id}_{partition_id}_local"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, self._attr_unique_id)},
name=f"Risco {system_id} Partition {partition_id}",
manufacturer="Risco",
)
async def async_added_to_hass(self) -> None:
"""Subscribe to updates."""
self._partition_updates[self._partition_id] = self.async_write_ha_state
async def _call_alarm_method(self, method: str, *args: Any) -> None:
await getattr(self._partition, method)(*args)

View File

@ -1,4 +1,11 @@
"""Support for Risco alarm zones."""
from __future__ import annotations
from collections.abc import Callable, Mapping
from typing import Any
from pyrisco.common import Zone
from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
@ -9,6 +16,7 @@ from homeassistant.helpers import entity_platform
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import LocalData, RiscoDataUpdateCoordinator, is_local
from .const import DATA_COORDINATOR, DOMAIN
from .entity import RiscoEntity, binary_sensor_unique_id
@ -28,69 +36,117 @@ async def async_setup_entry(
SERVICE_UNBYPASS_ZONE, {}, "async_unbypass_zone"
)
coordinator = hass.data[DOMAIN][config_entry.entry_id][DATA_COORDINATOR]
entities = [
RiscoBinarySensor(coordinator, zone_id, zone)
for zone_id, zone in coordinator.data.zones.items()
]
async_add_entities(entities, False)
class RiscoBinarySensor(BinarySensorEntity, RiscoEntity):
"""Representation of a Risco zone as a binary sensor."""
def __init__(self, coordinator, zone_id, zone):
"""Init the zone."""
super().__init__(coordinator)
self._zone_id = zone_id
self._zone = zone
def _get_data_from_coordinator(self):
self._zone = self.coordinator.data.zones[self._zone_id]
@property
def device_info(self) -> DeviceInfo:
"""Return device info for this device."""
return DeviceInfo(
identifiers={(DOMAIN, self.unique_id)},
manufacturer="Risco",
name=self.name,
if is_local(config_entry):
local_data: LocalData = hass.data[DOMAIN][config_entry.entry_id]
async_add_entities(
RiscoLocalBinarySensor(
local_data.system.id, zone_id, zone, local_data.zone_updates
)
for zone_id, zone in local_data.system.zones.items()
)
else:
coordinator: RiscoDataUpdateCoordinator = hass.data[DOMAIN][
config_entry.entry_id
][DATA_COORDINATOR]
async_add_entities(
RiscoCloudBinarySensor(coordinator, zone_id, zone)
for zone_id, zone in coordinator.data.zones.items()
)
@property
def name(self):
"""Return the name of the zone."""
return self._zone.name
class RiscoBinarySensor(BinarySensorEntity):
"""Representation of a Risco zone as a binary sensor."""
_attr_device_class = BinarySensorDeviceClass.MOTION
def __init__(self, *, zone_id: int, zone: Zone, **kwargs: Any) -> None:
"""Init the zone."""
super().__init__(**kwargs)
self._zone_id = zone_id
self._zone = zone
self._attr_has_entity_name = True
self._attr_name = None
@property
def unique_id(self):
"""Return a unique id for this zone."""
return binary_sensor_unique_id(self._risco, self._zone_id)
@property
def extra_state_attributes(self):
def extra_state_attributes(self) -> Mapping[str, Any] | None:
"""Return the state attributes."""
return {"zone_id": self._zone_id, "bypassed": self._zone.bypassed}
@property
def is_on(self):
def is_on(self) -> bool | None:
"""Return true if sensor is on."""
return self._zone.triggered
@property
def device_class(self):
"""Return the class of this sensor, from BinarySensorDeviceClass."""
return BinarySensorDeviceClass.MOTION
async def async_bypass_zone(self) -> None:
"""Bypass this zone."""
await self._bypass(True)
async def _bypass(self, bypass):
async def async_unbypass_zone(self) -> None:
"""Unbypass this zone."""
await self._bypass(False)
async def _bypass(self, bypass: bool) -> None:
raise NotImplementedError
class RiscoCloudBinarySensor(RiscoBinarySensor, RiscoEntity):
"""Representation of a Risco cloud zone as a binary sensor."""
def __init__(
self, coordinator: RiscoDataUpdateCoordinator, zone_id: int, zone: Zone
) -> None:
"""Init the zone."""
super().__init__(zone_id=zone_id, zone=zone, coordinator=coordinator)
self._attr_unique_id = binary_sensor_unique_id(self._risco, zone_id)
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, self._attr_unique_id)},
manufacturer="Risco",
name=self._zone.name,
)
def _get_data_from_coordinator(self) -> None:
self._zone = self.coordinator.data.zones[self._zone_id]
async def _bypass(self, bypass: bool) -> None:
alarm = await self._risco.bypass_zone(self._zone_id, bypass)
self._zone = alarm.zones[self._zone_id]
self.async_write_ha_state()
async def async_bypass_zone(self):
"""Bypass this zone."""
await self._bypass(True)
async def async_unbypass_zone(self):
"""Unbypass this zone."""
await self._bypass(False)
class RiscoLocalBinarySensor(RiscoBinarySensor):
"""Representation of a Risco local zone as a binary sensor."""
_attr_should_poll = False
def __init__(
self,
system_id: str,
zone_id: int,
zone: Zone,
zone_updates: dict[int, Callable[[], Any]],
) -> None:
"""Init the zone."""
super().__init__(zone_id=zone_id, zone=zone)
self._system_id = system_id
self._zone_updates = zone_updates
self._attr_unique_id = f"{system_id}_zone_{zone_id}_local"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, self._attr_unique_id)},
manufacturer="Risco",
name=self._zone.name,
)
async def async_added_to_hass(self) -> None:
"""Subscribe to updates."""
self._zone_updates[self._zone_id] = self.async_write_ha_state
@property
def extra_state_attributes(self) -> Mapping[str, Any] | None:
"""Return the state attributes."""
return {
**(super().extra_state_attributes or {}),
"groups": self._zone.groups,
}
async def _bypass(self, bypass: bool) -> None:
await self._zone.bypass(bypass)

View File

@ -1,16 +1,21 @@
"""Config flow for Risco integration."""
from __future__ import annotations
import asyncio
from collections.abc import Mapping
import logging
from pyrisco import CannotConnectError, RiscoCloud, UnauthorizedError
from pyrisco import CannotConnectError, RiscoCloud, RiscoLocal, UnauthorizedError
import voluptuous as vol
from homeassistant import config_entries, core
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_PIN,
CONF_PORT,
CONF_SCAN_INTERVAL,
CONF_TYPE,
CONF_USERNAME,
STATE_ALARM_ARMED_AWAY,
STATE_ALARM_ARMED_CUSTOM_BYPASS,
@ -27,18 +32,27 @@ from .const import (
DEFAULT_OPTIONS,
DOMAIN,
RISCO_STATES,
SLEEP_INTERVAL,
TYPE_LOCAL,
)
_LOGGER = logging.getLogger(__name__)
DATA_SCHEMA = vol.Schema(
CLOUD_SCHEMA = vol.Schema(
{
vol.Required(CONF_USERNAME): str,
vol.Required(CONF_PASSWORD): str,
vol.Required(CONF_PIN): str,
}
)
LOCAL_SCHEMA = vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Required(CONF_PORT, default=1000): int,
vol.Required(CONF_PIN): str,
}
)
HA_STATES = [
STATE_ALARM_ARMED_AWAY,
STATE_ALARM_ARMED_HOME,
@ -47,10 +61,10 @@ HA_STATES = [
]
async def validate_input(hass: core.HomeAssistant, data):
"""Validate the user input allows us to connect.
async def validate_cloud_input(hass: core.HomeAssistant, data) -> dict[str, str]:
"""Validate the user input allows us to connect to Risco Cloud.
Data has the keys from DATA_SCHEMA with values provided by the user.
Data has the keys from CLOUD_SCHEMA with values provided by the user.
"""
risco = RiscoCloud(data[CONF_USERNAME], data[CONF_PASSWORD], data[CONF_PIN])
@ -62,6 +76,20 @@ async def validate_input(hass: core.HomeAssistant, data):
return {"title": risco.site_name}
async def validate_local_input(
hass: core.HomeAssistant, data: Mapping[str, str]
) -> dict[str, str]:
"""Validate the user input allows us to connect to a local panel.
Data has the keys from LOCAL_SCHEMA with values provided by the user.
"""
risco = RiscoLocal(data[CONF_HOST], data[CONF_PORT], data[CONF_PIN])
await risco.connect()
site_id = risco.id
await risco.disconnect()
return {"title": site_id}
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Risco."""
@ -77,13 +105,20 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
"""Handle the initial step."""
return self.async_show_menu(
step_id="user",
menu_options=["cloud", "local"],
)
async def async_step_cloud(self, user_input=None):
"""Configure a cloud based alarm."""
errors = {}
if user_input is not None:
await self.async_set_unique_id(user_input[CONF_USERNAME])
self._abort_if_unique_id_configured()
try:
info = await validate_input(self.hass, user_input)
info = await validate_cloud_input(self.hass, user_input)
except CannotConnectError:
errors["base"] = "cannot_connect"
except UnauthorizedError:
@ -95,7 +130,35 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
return self.async_create_entry(title=info["title"], data=user_input)
return self.async_show_form(
step_id="user", data_schema=DATA_SCHEMA, errors=errors
step_id="cloud", data_schema=CLOUD_SCHEMA, errors=errors
)
async def async_step_local(self, user_input=None):
"""Configure a local based alarm."""
errors = {}
if user_input is not None:
try:
info = await validate_local_input(self.hass, user_input)
except CannotConnectError:
errors["base"] = "cannot_connect"
except UnauthorizedError:
errors["base"] = "invalid_auth"
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
await self.async_set_unique_id(info["title"])
self._abort_if_unique_id_configured()
# Risco can hang if we don't wait before creating a new connection
await asyncio.sleep(SLEEP_INTERVAL)
return self.async_create_entry(
title=info["title"], data={**user_input, **{CONF_TYPE: TYPE_LOCAL}}
)
return self.async_show_form(
step_id="local", data_schema=LOCAL_SCHEMA, errors=errors
)

View File

@ -15,6 +15,8 @@ EVENTS_COORDINATOR = "risco_events"
DEFAULT_SCAN_INTERVAL = 30
TYPE_LOCAL = "local"
CONF_CODE_ARM_REQUIRED = "code_arm_required"
CONF_CODE_DISARM_REQUIRED = "code_disarm_required"
CONF_RISCO_STATES_TO_HA = "risco_states_to_ha"
@ -44,3 +46,5 @@ DEFAULT_OPTIONS = {
CONF_RISCO_STATES_TO_HA: DEFAULT_RISCO_STATES_TO_HA,
CONF_HA_STATES_TO_RISCO: DEFAULT_HA_STATES_TO_RISCO,
}
SLEEP_INTERVAL = 1

View File

@ -1,13 +1,15 @@
"""A risco entity base class."""
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import RiscoDataUpdateCoordinator
def binary_sensor_unique_id(risco, zone_id):
def binary_sensor_unique_id(risco, zone_id: int) -> str:
"""Return unique id for the binary sensor."""
return f"{risco.site_uuid}_zone_{zone_id}"
class RiscoEntity(CoordinatorEntity):
class RiscoEntity(CoordinatorEntity[RiscoDataUpdateCoordinator]):
"""Risco entity base class."""
def _get_data_from_coordinator(self):

View File

@ -6,6 +6,6 @@
"requirements": ["pyrisco==0.5.2"],
"codeowners": ["@OnFreund"],
"quality_scale": "platinum",
"iot_class": "cloud_polling",
"iot_class": "local_push",
"loggers": ["pyrisco"]
}

View File

@ -1,4 +1,9 @@
"""Sensor for Risco Events."""
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from homeassistant.components.binary_sensor import DOMAIN as BS_DOMAIN
from homeassistant.components.sensor import SensorDeviceClass, SensorEntity
from homeassistant.config_entries import ConfigEntry
@ -8,6 +13,7 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.util import dt as dt_util
from . import RiscoEventsDataUpdateCoordinator, is_local
from .const import DOMAIN, EVENTS_COORDINATOR
from .entity import binary_sensor_unique_id
@ -38,7 +44,13 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up sensors for device."""
coordinator = hass.data[DOMAIN][config_entry.entry_id][EVENTS_COORDINATOR]
if is_local(config_entry):
# no events in local comm
return
coordinator: RiscoEventsDataUpdateCoordinator = hass.data[DOMAIN][
config_entry.entry_id
][EVENTS_COORDINATOR]
sensors = [
RiscoSensor(coordinator, id, [], name, config_entry.entry_id)
for id, name in CATEGORIES.items()
@ -62,19 +74,12 @@ class RiscoSensor(CoordinatorEntity, SensorEntity):
self._excludes = excludes
self._name = name
self._entry_id = entry_id
self._entity_registry = None
self._entity_registry: er.EntityRegistry | None = None
self._attr_unique_id = f"events_{name}_{self.coordinator.risco.site_uuid}"
self._attr_name = f"Risco {self.coordinator.risco.site_name} {name} Events"
self._attr_device_class = SensorDeviceClass.TIMESTAMP
@property
def name(self):
"""Return the name of the sensor."""
return f"Risco {self.coordinator.risco.site_name} {self._name} Events"
@property
def unique_id(self):
"""Return a unique id for this sensor."""
return f"events_{self._name}_{self.coordinator.risco.site_uuid}"
async def async_added_to_hass(self):
async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
self._entity_registry = er.async_get(self.hass)
self.async_on_remove(
@ -103,7 +108,7 @@ class RiscoSensor(CoordinatorEntity, SensorEntity):
)
@property
def extra_state_attributes(self):
def extra_state_attributes(self) -> Mapping[str, Any] | None:
"""State attributes."""
if self._event is None:
return None
@ -120,8 +125,3 @@ class RiscoSensor(CoordinatorEntity, SensorEntity):
attrs["zone_entity_id"] = zone_entity_id
return attrs
@property
def device_class(self):
"""Device class of sensor."""
return SensorDeviceClass.TIMESTAMP

View File

@ -2,11 +2,24 @@
"config": {
"step": {
"user": {
"menu_options": {
"cloud": "Risco Cloud (recommended)",
"local": "Local Risco Panel (advanced)"
}
},
"cloud": {
"data": {
"username": "[%key:common::config_flow::data::username%]",
"password": "[%key:common::config_flow::data::password%]",
"pin": "[%key:common::config_flow::data::pin%]"
}
},
"local": {
"data": {
"host": "[%key:common::config_flow::data::host%]",
"port": "[%key:common::config_flow::data::port%]",
"pin": "[%key:common::config_flow::data::pin%]"
}
}
},
"error": {

View File

@ -9,12 +9,25 @@
"unknown": "Unexpected error"
},
"step": {
"user": {
"cloud": {
"data": {
"password": "Password",
"pin": "PIN Code",
"username": "Username"
}
},
"local": {
"data": {
"host": "Host",
"pin": "PIN Code",
"port": "Port"
}
},
"user": {
"menu_options": {
"cloud": "Risco Cloud (recommended)",
"local": "Local Risco Panel (advanced)"
}
}
}
},

View File

@ -0,0 +1,148 @@
"""Fixtures for Risco tests."""
from unittest.mock import MagicMock, PropertyMock, patch
from pytest import fixture
from homeassistant.components.risco.const import DOMAIN, TYPE_LOCAL
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_PIN,
CONF_PORT,
CONF_TYPE,
CONF_USERNAME,
)
from .util import TEST_SITE_NAME, TEST_SITE_UUID, zone_mock
from tests.common import MockConfigEntry
TEST_CLOUD_CONFIG = {
CONF_USERNAME: "test-username",
CONF_PASSWORD: "test-password",
CONF_PIN: "1234",
}
TEST_LOCAL_CONFIG = {
CONF_TYPE: TYPE_LOCAL,
CONF_HOST: "test-host",
CONF_PORT: 5004,
CONF_PIN: "1234",
}
@fixture
def two_zone_cloud():
"""Fixture to mock alarm with two zones."""
zone_mocks = {0: zone_mock(), 1: zone_mock()}
alarm_mock = MagicMock()
with patch.object(
zone_mocks[0], "id", new_callable=PropertyMock(return_value=0)
), patch.object(
zone_mocks[0], "name", new_callable=PropertyMock(return_value="Zone 0")
), patch.object(
zone_mocks[1], "id", new_callable=PropertyMock(return_value=1)
), patch.object(
zone_mocks[1], "name", new_callable=PropertyMock(return_value="Zone 1")
), patch.object(
alarm_mock,
"zones",
new_callable=PropertyMock(return_value=zone_mocks),
), patch(
"homeassistant.components.risco.RiscoCloud.get_state",
return_value=alarm_mock,
):
yield zone_mocks
@fixture
def options():
"""Fixture for default (empty) options."""
return {}
@fixture
def events():
"""Fixture for default (empty) events."""
return []
@fixture
def cloud_config_entry(hass, options):
"""Fixture for a cloud config entry."""
config_entry = MockConfigEntry(
domain=DOMAIN, data=TEST_CLOUD_CONFIG, options=options
)
config_entry.add_to_hass(hass)
return config_entry
@fixture
def login_with_error(exception):
"""Fixture to simulate error on login."""
with patch(
"homeassistant.components.risco.RiscoCloud.login",
side_effect=exception,
):
yield
@fixture
async def setup_risco_cloud(hass, cloud_config_entry, events):
"""Set up a Risco integration for testing."""
with patch(
"homeassistant.components.risco.RiscoCloud.login",
return_value=True,
), patch(
"homeassistant.components.risco.RiscoCloud.site_uuid",
new_callable=PropertyMock(return_value=TEST_SITE_UUID),
), patch(
"homeassistant.components.risco.RiscoCloud.site_name",
new_callable=PropertyMock(return_value=TEST_SITE_NAME),
), patch(
"homeassistant.components.risco.RiscoCloud.close"
), patch(
"homeassistant.components.risco.RiscoCloud.get_events",
return_value=events,
):
await hass.config_entries.async_setup(cloud_config_entry.entry_id)
await hass.async_block_till_done()
yield cloud_config_entry
@fixture
def local_config_entry(hass, options):
"""Fixture for a local config entry."""
config_entry = MockConfigEntry(
domain=DOMAIN, data=TEST_LOCAL_CONFIG, options=options
)
config_entry.add_to_hass(hass)
return config_entry
@fixture
def connect_with_error(exception):
"""Fixture to simulate error on connect."""
with patch(
"homeassistant.components.risco.RiscoLocal.connect",
side_effect=exception,
):
yield
@fixture
async def setup_risco_local(hass, local_config_entry):
"""Set up a local Risco integration for testing."""
with patch(
"homeassistant.components.risco.RiscoLocal.connect",
return_value=True,
), patch(
"homeassistant.components.risco.RiscoLocal.id",
new_callable=PropertyMock(return_value=TEST_SITE_UUID),
), patch(
"homeassistant.components.risco.RiscoLocal.disconnect"
):
await hass.config_entries.async_setup(local_config_entry.entry_id)
await hass.async_block_till_done()
yield local_config_entry

File diff suppressed because it is too large Load Diff

View File

@ -1,62 +1,55 @@
"""Tests for the Risco binary sensors."""
from unittest.mock import PropertyMock, patch
import pytest
from homeassistant.components.risco import CannotConnectError, UnauthorizedError
from homeassistant.components.risco.const import DOMAIN
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.entity_component import async_update_entity
from .util import TEST_CONFIG, TEST_SITE_UUID, setup_risco
from .util import two_zone_alarm # noqa: F401
from tests.common import MockConfigEntry
from .util import TEST_SITE_UUID, zone_mock
FIRST_ENTITY_ID = "binary_sensor.zone_0"
SECOND_ENTITY_ID = "binary_sensor.zone_1"
async def test_cannot_connect(hass):
"""Test connection error."""
with patch(
"homeassistant.components.risco.RiscoCloud.login",
side_effect=CannotConnectError,
@pytest.fixture
def two_zone_local():
"""Fixture to mock alarm with two zones."""
zone_mocks = {0: zone_mock(), 1: zone_mock()}
with patch.object(
zone_mocks[0], "id", new_callable=PropertyMock(return_value=0)
), patch.object(
zone_mocks[0], "name", new_callable=PropertyMock(return_value="Zone 0")
), patch.object(
zone_mocks[1], "id", new_callable=PropertyMock(return_value=1)
), patch.object(
zone_mocks[1], "name", new_callable=PropertyMock(return_value="Zone 1")
), patch(
"homeassistant.components.risco.RiscoLocal.partitions",
new_callable=PropertyMock(return_value={}),
), patch(
"homeassistant.components.risco.RiscoLocal.zones",
new_callable=PropertyMock(return_value=zone_mocks),
):
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
registry = er.async_get(hass)
assert not registry.async_is_registered(FIRST_ENTITY_ID)
assert not registry.async_is_registered(SECOND_ENTITY_ID)
yield zone_mocks
async def test_unauthorized(hass):
"""Test unauthorized error."""
with patch(
"homeassistant.components.risco.RiscoCloud.login",
side_effect=UnauthorizedError,
):
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
registry = er.async_get(hass)
assert not registry.async_is_registered(FIRST_ENTITY_ID)
assert not registry.async_is_registered(SECOND_ENTITY_ID)
async def test_setup(hass, two_zone_alarm): # noqa: F811
"""Test entity setup."""
@pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError])
async def test_error_on_login(hass, login_with_error, cloud_config_entry):
"""Test error on login."""
await hass.config_entries.async_setup(cloud_config_entry.entry_id)
await hass.async_block_till_done()
registry = er.async_get(hass)
assert not registry.async_is_registered(FIRST_ENTITY_ID)
assert not registry.async_is_registered(SECOND_ENTITY_ID)
await setup_risco(hass)
async def test_cloud_setup(hass, two_zone_cloud, setup_risco_cloud):
"""Test entity setup."""
registry = er.async_get(hass)
assert registry.async_is_registered(FIRST_ENTITY_ID)
assert registry.async_is_registered(SECOND_ENTITY_ID)
@ -70,13 +63,13 @@ async def test_setup(hass, two_zone_alarm): # noqa: F811
assert device.manufacturer == "Risco"
async def _check_state(hass, alarm, triggered, bypassed, entity_id, zone_id):
async def _check_cloud_state(hass, zones, triggered, bypassed, entity_id, zone_id):
with patch.object(
alarm.zones[zone_id],
zones[zone_id],
"triggered",
new_callable=PropertyMock(return_value=triggered),
), patch.object(
alarm.zones[zone_id],
zones[zone_id],
"bypassed",
new_callable=PropertyMock(return_value=bypassed),
):
@ -89,23 +82,20 @@ async def _check_state(hass, alarm, triggered, bypassed, entity_id, zone_id):
assert hass.states.get(entity_id).attributes["zone_id"] == zone_id
async def test_states(hass, two_zone_alarm): # noqa: F811
async def test_cloud_states(hass, two_zone_cloud, setup_risco_cloud):
"""Test the various alarm states."""
await setup_risco(hass)
await _check_state(hass, two_zone_alarm, True, True, FIRST_ENTITY_ID, 0)
await _check_state(hass, two_zone_alarm, True, False, FIRST_ENTITY_ID, 0)
await _check_state(hass, two_zone_alarm, False, True, FIRST_ENTITY_ID, 0)
await _check_state(hass, two_zone_alarm, False, False, FIRST_ENTITY_ID, 0)
await _check_state(hass, two_zone_alarm, True, True, SECOND_ENTITY_ID, 1)
await _check_state(hass, two_zone_alarm, True, False, SECOND_ENTITY_ID, 1)
await _check_state(hass, two_zone_alarm, False, True, SECOND_ENTITY_ID, 1)
await _check_state(hass, two_zone_alarm, False, False, SECOND_ENTITY_ID, 1)
await _check_cloud_state(hass, two_zone_cloud, True, True, FIRST_ENTITY_ID, 0)
await _check_cloud_state(hass, two_zone_cloud, True, False, FIRST_ENTITY_ID, 0)
await _check_cloud_state(hass, two_zone_cloud, False, True, FIRST_ENTITY_ID, 0)
await _check_cloud_state(hass, two_zone_cloud, False, False, FIRST_ENTITY_ID, 0)
await _check_cloud_state(hass, two_zone_cloud, True, True, SECOND_ENTITY_ID, 1)
await _check_cloud_state(hass, two_zone_cloud, True, False, SECOND_ENTITY_ID, 1)
await _check_cloud_state(hass, two_zone_cloud, False, True, SECOND_ENTITY_ID, 1)
await _check_cloud_state(hass, two_zone_cloud, False, False, SECOND_ENTITY_ID, 1)
async def test_bypass(hass, two_zone_alarm): # noqa: F811
async def test_cloud_bypass(hass, two_zone_cloud, setup_risco_cloud):
"""Test bypassing a zone."""
await setup_risco(hass)
with patch("homeassistant.components.risco.RiscoCloud.bypass_zone") as mock:
data = {"entity_id": FIRST_ENTITY_ID}
@ -116,9 +106,8 @@ async def test_bypass(hass, two_zone_alarm): # noqa: F811
mock.assert_awaited_once_with(0, True)
async def test_unbypass(hass, two_zone_alarm): # noqa: F811
async def test_cloud_unbypass(hass, two_zone_cloud, setup_risco_cloud):
"""Test unbypassing a zone."""
await setup_risco(hass)
with patch("homeassistant.components.risco.RiscoCloud.bypass_zone") as mock:
data = {"entity_id": FIRST_ENTITY_ID}
@ -127,3 +116,113 @@ async def test_unbypass(hass, two_zone_alarm): # noqa: F811
)
mock.assert_awaited_once_with(0, False)
@pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError])
async def test_error_on_connect(hass, connect_with_error, local_config_entry):
"""Test error on connect."""
await hass.config_entries.async_setup(local_config_entry.entry_id)
await hass.async_block_till_done()
registry = er.async_get(hass)
assert not registry.async_is_registered(FIRST_ENTITY_ID)
assert not registry.async_is_registered(SECOND_ENTITY_ID)
async def test_local_setup(hass, two_zone_local, setup_risco_local):
"""Test entity setup."""
registry = er.async_get(hass)
assert registry.async_is_registered(FIRST_ENTITY_ID)
assert registry.async_is_registered(SECOND_ENTITY_ID)
registry = dr.async_get(hass)
device = registry.async_get_device({(DOMAIN, TEST_SITE_UUID + "_zone_0_local")})
assert device is not None
assert device.manufacturer == "Risco"
device = registry.async_get_device({(DOMAIN, TEST_SITE_UUID + "_zone_1_local")})
assert device is not None
assert device.manufacturer == "Risco"
async def _check_local_state(
hass, zones, triggered, bypassed, entity_id, zone_id, callback
):
with patch.object(
zones[zone_id],
"triggered",
new_callable=PropertyMock(return_value=triggered),
), patch.object(
zones[zone_id],
"bypassed",
new_callable=PropertyMock(return_value=bypassed),
):
await callback(zone_id, zones[zone_id])
expected_triggered = STATE_ON if triggered else STATE_OFF
assert hass.states.get(entity_id).state == expected_triggered
assert hass.states.get(entity_id).attributes["bypassed"] == bypassed
assert hass.states.get(entity_id).attributes["zone_id"] == zone_id
@pytest.fixture
def _mock_zone_handler():
with patch("homeassistant.components.risco.RiscoLocal.add_zone_handler") as mock:
yield mock
async def test_local_states(
hass, two_zone_local, _mock_zone_handler, setup_risco_local
):
"""Test the various alarm states."""
callback = _mock_zone_handler.call_args.args[0]
assert callback is not None
await _check_local_state(
hass, two_zone_local, True, True, FIRST_ENTITY_ID, 0, callback
)
await _check_local_state(
hass, two_zone_local, True, False, FIRST_ENTITY_ID, 0, callback
)
await _check_local_state(
hass, two_zone_local, False, True, FIRST_ENTITY_ID, 0, callback
)
await _check_local_state(
hass, two_zone_local, False, False, FIRST_ENTITY_ID, 0, callback
)
await _check_local_state(
hass, two_zone_local, True, True, SECOND_ENTITY_ID, 1, callback
)
await _check_local_state(
hass, two_zone_local, True, False, SECOND_ENTITY_ID, 1, callback
)
await _check_local_state(
hass, two_zone_local, False, True, SECOND_ENTITY_ID, 1, callback
)
await _check_local_state(
hass, two_zone_local, False, False, SECOND_ENTITY_ID, 1, callback
)
async def test_local_bypass(hass, two_zone_local, setup_risco_local):
"""Test bypassing a zone."""
with patch.object(two_zone_local[0], "bypass") as mock:
data = {"entity_id": FIRST_ENTITY_ID}
await hass.services.async_call(
DOMAIN, "bypass_zone", service_data=data, blocking=True
)
mock.assert_awaited_once_with(True)
async def test_local_unbypass(hass, two_zone_local, setup_risco_local):
"""Test unbypassing a zone."""
with patch.object(two_zone_local[0], "bypass") as mock:
data = {"entity_id": FIRST_ENTITY_ID}
await hass.services.async_call(
DOMAIN, "unbypass_zone", service_data=data, blocking=True
)
mock.assert_awaited_once_with(False)

View File

@ -4,22 +4,29 @@ from unittest.mock import PropertyMock, patch
import pytest
import voluptuous as vol
from homeassistant import config_entries, data_entry_flow
from homeassistant import config_entries
from homeassistant.components.risco.config_flow import (
CannotConnectError,
UnauthorizedError,
)
from homeassistant.components.risco.const import DOMAIN
from homeassistant.data_entry_flow import FlowResultType
from tests.common import MockConfigEntry
TEST_SITE_NAME = "test-site-name"
TEST_DATA = {
TEST_CLOUD_DATA = {
"username": "test-username",
"password": "test-password",
"pin": "1234",
}
TEST_LOCAL_DATA = {
"host": "test-host",
"port": 5004,
"pin": "1234",
}
TEST_RISCO_TO_HA = {
"arm": "armed_away",
"partial_arm": "armed_home",
@ -42,13 +49,19 @@ TEST_OPTIONS = {
}
async def test_form(hass):
"""Test we get the form."""
async def test_cloud_form(hass):
"""Test we get the cloud form."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["errors"] == {}
assert result["type"] == FlowResultType.MENU
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "cloud"}
)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {}
with patch(
"homeassistant.components.risco.config_flow.RiscoCloud.login",
@ -59,17 +72,20 @@ async def test_form(hass):
), patch(
"homeassistant.components.risco.config_flow.RiscoCloud.close"
) as mock_close, patch(
"homeassistant.components.risco.config_flow.SLEEP_INTERVAL",
0,
), patch(
"homeassistant.components.risco.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], TEST_DATA
result3 = await hass.config_entries.flow.async_configure(
result2["flow_id"], TEST_CLOUD_DATA
)
await hass.async_block_till_done()
assert result2["type"] == "create_entry"
assert result2["title"] == TEST_SITE_NAME
assert result2["data"] == TEST_DATA
assert result3["type"] == FlowResultType.CREATE_ENTRY
assert result3["title"] == TEST_SITE_NAME
assert result3["data"] == TEST_CLOUD_DATA
assert len(mock_setup_entry.mock_calls) == 1
mock_close.assert_awaited_once()
@ -82,33 +98,33 @@ async def test_form(hass):
(Exception, "unknown"),
],
)
async def test_error(hass, exception, error):
async def test_cloud_error(hass, login_with_error, error):
"""Test we handle config flow errors."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "cloud"}
)
with patch(
"homeassistant.components.risco.config_flow.RiscoCloud.login",
side_effect=exception,
), patch(
"homeassistant.components.risco.config_flow.RiscoCloud.close"
) as mock_close:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], TEST_DATA
result3 = await hass.config_entries.flow.async_configure(
result2["flow_id"], TEST_CLOUD_DATA
)
mock_close.assert_awaited_once()
assert result2["type"] == "form"
assert result2["errors"] == {"base": error}
assert result3["type"] == FlowResultType.FORM
assert result3["errors"] == {"base": error}
async def test_form_already_exists(hass):
async def test_form_cloud_already_exists(hass):
"""Test that a flow with an existing username aborts."""
entry = MockConfigEntry(
domain=DOMAIN,
unique_id=TEST_DATA["username"],
data=TEST_DATA,
unique_id=TEST_CLOUD_DATA["username"],
data=TEST_CLOUD_DATA,
)
entry.add_to_hass(hass)
@ -118,33 +134,139 @@ async def test_form_already_exists(hass):
)
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], TEST_DATA
result["flow_id"], {"next_step_id": "cloud"}
)
assert result2["type"] == "abort"
assert result2["reason"] == "already_configured"
result3 = await hass.config_entries.flow.async_configure(
result2["flow_id"], TEST_CLOUD_DATA
)
assert result3["type"] == FlowResultType.ABORT
assert result3["reason"] == "already_configured"
async def test_local_form(hass):
"""Test we get the local form."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == FlowResultType.MENU
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "local"}
)
assert result2["type"] == FlowResultType.FORM
assert result2["errors"] == {}
with patch(
"homeassistant.components.risco.config_flow.RiscoLocal.connect",
return_value=True,
), patch(
"homeassistant.components.risco.config_flow.RiscoLocal.id",
new_callable=PropertyMock(return_value=TEST_SITE_NAME),
), patch(
"homeassistant.components.risco.config_flow.RiscoLocal.disconnect"
) as mock_close, patch(
"homeassistant.components.risco.config_flow.SLEEP_INTERVAL",
0,
), patch(
"homeassistant.components.risco.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result3 = await hass.config_entries.flow.async_configure(
result2["flow_id"], TEST_LOCAL_DATA
)
await hass.async_block_till_done()
expected_data = {**TEST_LOCAL_DATA, **{"type": "local"}}
assert result3["type"] == FlowResultType.CREATE_ENTRY
assert result3["title"] == TEST_SITE_NAME
assert result3["data"] == expected_data
assert len(mock_setup_entry.mock_calls) == 1
mock_close.assert_awaited_once()
@pytest.mark.parametrize(
"exception, error",
[
(UnauthorizedError, "invalid_auth"),
(CannotConnectError, "cannot_connect"),
(Exception, "unknown"),
],
)
async def test_local_error(hass, connect_with_error, error):
"""Test we handle config flow errors."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "local"}
)
result3 = await hass.config_entries.flow.async_configure(
result2["flow_id"], TEST_LOCAL_DATA
)
assert result3["type"] == FlowResultType.FORM
assert result3["errors"] == {"base": error}
async def test_form_local_already_exists(hass):
"""Test that a flow with an existing host aborts."""
entry = MockConfigEntry(
domain=DOMAIN,
unique_id=TEST_SITE_NAME,
data=TEST_LOCAL_DATA,
)
entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], {"next_step_id": "local"}
)
with patch(
"homeassistant.components.risco.config_flow.RiscoLocal.connect",
return_value=True,
), patch(
"homeassistant.components.risco.config_flow.RiscoLocal.id",
new_callable=PropertyMock(return_value=TEST_SITE_NAME),
), patch(
"homeassistant.components.risco.config_flow.RiscoLocal.disconnect"
):
result3 = await hass.config_entries.flow.async_configure(
result2["flow_id"], TEST_LOCAL_DATA
)
assert result3["type"] == FlowResultType.ABORT
assert result3["reason"] == "already_configured"
async def test_options_flow(hass):
"""Test options flow."""
entry = MockConfigEntry(
domain=DOMAIN,
unique_id=TEST_DATA["username"],
data=TEST_DATA,
unique_id=TEST_CLOUD_DATA["username"],
data=TEST_CLOUD_DATA,
)
entry.add_to_hass(hass)
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] == data_entry_flow.FlowResultType.FORM
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input=TEST_OPTIONS,
)
assert result["type"] == data_entry_flow.FlowResultType.FORM
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "risco_to_ha"
result = await hass.config_entries.options.async_configure(
@ -152,7 +274,7 @@ async def test_options_flow(hass):
user_input=TEST_RISCO_TO_HA,
)
assert result["type"] == data_entry_flow.FlowResultType.FORM
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "ha_to_risco"
with patch("homeassistant.components.risco.async_setup_entry", return_value=True):
@ -161,7 +283,7 @@ async def test_options_flow(hass):
user_input=TEST_HA_TO_RISCO,
)
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
assert result["type"] == FlowResultType.CREATE_ENTRY
assert entry.options == {
**TEST_OPTIONS,
"risco_states_to_ha": TEST_RISCO_TO_HA,
@ -173,8 +295,8 @@ async def test_ha_to_risco_schema(hass):
"""Test that the schema for the ha-to-risco mapping step is generated properly."""
entry = MockConfigEntry(
domain=DOMAIN,
unique_id=TEST_DATA["username"],
data=TEST_DATA,
unique_id=TEST_CLOUD_DATA["username"],
data=TEST_CLOUD_DATA,
)
entry.add_to_hass(hass)

View File

@ -2,19 +2,17 @@
from datetime import timedelta
from unittest.mock import MagicMock, PropertyMock, patch
import pytest
from homeassistant.components.risco import (
LAST_EVENT_TIMESTAMP_KEY,
CannotConnectError,
UnauthorizedError,
)
from homeassistant.components.risco.const import DOMAIN
from homeassistant.helpers import entity_registry as er
from homeassistant.util import dt
from .util import TEST_CONFIG, TEST_SITE_UUID, setup_risco
from .util import two_zone_alarm # noqa: F401
from tests.common import MockConfigEntry, async_fire_time_changed
from tests.common import async_fire_time_changed
ENTITY_IDS = {
"Alarm": "sensor.risco_test_site_name_alarm_events",
@ -109,34 +107,23 @@ CATEGORIES_TO_EVENTS = {
}
async def test_cannot_connect(hass):
"""Test connection error."""
@pytest.fixture
def _no_zones_and_partitions():
with patch(
"homeassistant.components.risco.RiscoCloud.login",
side_effect=CannotConnectError,
"homeassistant.components.risco.RiscoLocal.zones",
new_callable=PropertyMock(return_value=[]),
), patch(
"homeassistant.components.risco.RiscoLocal.partitions",
new_callable=PropertyMock(return_value=[]),
):
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
registry = er.async_get(hass)
for id in ENTITY_IDS.values():
assert not registry.async_is_registered(id)
yield
async def test_unauthorized(hass):
"""Test unauthorized error."""
with patch(
"homeassistant.components.risco.RiscoCloud.login",
side_effect=UnauthorizedError,
):
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
@pytest.mark.parametrize("exception", [CannotConnectError, UnauthorizedError])
async def test_error_on_login(hass, login_with_error, cloud_config_entry):
"""Test error on login."""
await hass.config_entries.async_setup(cloud_config_entry.entry_id)
await hass.async_block_till_done()
registry = er.async_get(hass)
for id in ENTITY_IDS.values():
@ -166,29 +153,31 @@ def _check_state(hass, category, entity_id):
assert "zone_entity_id" not in state.attributes
async def test_setup(hass, two_zone_alarm): # noqa: F811
"""Test entity setup."""
@pytest.fixture
def _set_utc_time_zone(hass):
hass.config.set_time_zone("UTC")
registry = er.async_get(hass)
for id in ENTITY_IDS.values():
assert not registry.async_is_registered(id)
@pytest.fixture
def _save_mock():
with patch(
"homeassistant.components.risco.RiscoCloud.site_uuid",
new_callable=PropertyMock(return_value=TEST_SITE_UUID),
), patch(
"homeassistant.components.risco.Store.async_save",
) as save_mock:
await setup_risco(hass, TEST_EVENTS)
for id in ENTITY_IDS.values():
assert registry.async_is_registered(id)
yield save_mock
save_mock.assert_awaited_once_with(
{LAST_EVENT_TIMESTAMP_KEY: TEST_EVENTS[0].time}
)
for category, entity_id in ENTITY_IDS.items():
_check_state(hass, category, entity_id)
@pytest.mark.parametrize("events", [TEST_EVENTS])
async def test_cloud_setup(
hass, two_zone_cloud, _set_utc_time_zone, _save_mock, setup_risco_cloud
):
"""Test entity setup."""
registry = er.async_get(hass)
for id in ENTITY_IDS.values():
assert registry.async_is_registered(id)
_save_mock.assert_awaited_once_with({LAST_EVENT_TIMESTAMP_KEY: TEST_EVENTS[0].time})
for category, entity_id in ENTITY_IDS.items():
_check_state(hass, category, entity_id)
with patch(
"homeassistant.components.risco.RiscoCloud.get_events", return_value=[]
@ -202,3 +191,10 @@ async def test_setup(hass, two_zone_alarm): # noqa: F811
for category, entity_id in ENTITY_IDS.items():
_check_state(hass, category, entity_id)
async def test_local_setup(hass, setup_risco_local, _no_zones_and_partitions):
"""Test entity setup."""
registry = er.async_get(hass)
for id in ENTITY_IDS.values():
assert not registry.async_is_registered(id)

View File

@ -1,74 +1,12 @@
"""Utilities for Risco tests."""
from unittest.mock import MagicMock, PropertyMock, patch
from unittest.mock import AsyncMock, MagicMock
from pytest import fixture
from homeassistant.components.risco.const import DOMAIN
from homeassistant.const import CONF_PASSWORD, CONF_PIN, CONF_USERNAME
from tests.common import MockConfigEntry
TEST_CONFIG = {
CONF_USERNAME: "test-username",
CONF_PASSWORD: "test-password",
CONF_PIN: "1234",
}
TEST_SITE_UUID = "test-site-uuid"
TEST_SITE_NAME = "test-site-name"
async def setup_risco(hass, events=[], options={}):
"""Set up a Risco integration for testing."""
config_entry = MockConfigEntry(domain=DOMAIN, data=TEST_CONFIG, options=options)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.risco.RiscoCloud.login",
return_value=True,
), patch(
"homeassistant.components.risco.RiscoCloud.site_uuid",
new_callable=PropertyMock(return_value=TEST_SITE_UUID),
), patch(
"homeassistant.components.risco.RiscoCloud.site_name",
new_callable=PropertyMock(return_value=TEST_SITE_NAME),
), patch(
"homeassistant.components.risco.RiscoCloud.close"
), patch(
"homeassistant.components.risco.RiscoCloud.get_events",
return_value=events,
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return config_entry
def _zone_mock():
def zone_mock():
"""Return a mocked zone."""
return MagicMock(
triggered=False,
bypassed=False,
triggered=False, bypassed=False, bypass=AsyncMock(return_value=True)
)
@fixture
def two_zone_alarm():
"""Fixture to mock alarm with two zones."""
zone_mocks = {0: _zone_mock(), 1: _zone_mock()}
alarm_mock = MagicMock()
with patch.object(
zone_mocks[0], "id", new_callable=PropertyMock(return_value=0)
), patch.object(
zone_mocks[0], "name", new_callable=PropertyMock(return_value="Zone 0")
), patch.object(
zone_mocks[1], "id", new_callable=PropertyMock(return_value=1)
), patch.object(
zone_mocks[1], "name", new_callable=PropertyMock(return_value="Zone 1")
), patch.object(
alarm_mock,
"zones",
new_callable=PropertyMock(return_value=zone_mocks),
), patch(
"homeassistant.components.risco.RiscoCloud.get_state",
return_value=alarm_mock,
):
yield alarm_mock