Add Custom mapping of Risco states (#39218)

* Custom mapping of Risco states

* More informative error log

* Add alternative Risco terms

* Black formatting
This commit is contained in:
On Freund 2020-08-28 05:23:01 +03:00 committed by GitHub
parent 24db31fa28
commit b14af3e727
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 464 additions and 111 deletions

View File

@ -104,4 +104,4 @@ class RiscoDataUpdateCoordinator(DataUpdateCoordinator):
try:
return await self.risco.get_state()
except (CannotConnectError, UnauthorizedError, OperationError) as error:
raise UpdateFailed from error
raise UpdateFailed(error) from error

View File

@ -7,12 +7,16 @@ from homeassistant.components.alarm_control_panel import (
)
from homeassistant.components.alarm_control_panel.const import (
SUPPORT_ALARM_ARM_AWAY,
SUPPORT_ALARM_ARM_CUSTOM_BYPASS,
SUPPORT_ALARM_ARM_HOME,
SUPPORT_ALARM_ARM_NIGHT,
)
from homeassistant.const import (
CONF_PIN,
STATE_ALARM_ARMED_AWAY,
STATE_ALARM_ARMED_CUSTOM_BYPASS,
STATE_ALARM_ARMED_HOME,
STATE_ALARM_ARMED_NIGHT,
STATE_ALARM_ARMING,
STATE_ALARM_DISARMED,
STATE_ALARM_TRIGGERED,
@ -21,29 +25,33 @@ from homeassistant.const import (
from .const import (
CONF_CODE_ARM_REQUIRED,
CONF_CODE_DISARM_REQUIRED,
CONF_HA_STATES_TO_RISCO,
CONF_RISCO_STATES_TO_HA,
DATA_COORDINATOR,
DEFAULT_OPTIONS,
DOMAIN,
RISCO_ARM,
RISCO_GROUPS,
RISCO_PARTIAL_ARM,
)
from .entity import RiscoEntity
_LOGGER = logging.getLogger(__name__)
SUPPORTED_STATES = [
STATE_ALARM_DISARMED,
STATE_ALARM_ARMED_AWAY,
STATE_ALARM_ARMED_HOME,
STATE_ALARM_TRIGGERED,
]
STATES_TO_SUPPORTED_FEATURES = {
STATE_ALARM_ARMED_AWAY: SUPPORT_ALARM_ARM_AWAY,
STATE_ALARM_ARMED_CUSTOM_BYPASS: SUPPORT_ALARM_ARM_CUSTOM_BYPASS,
STATE_ALARM_ARMED_HOME: SUPPORT_ALARM_ARM_HOME,
STATE_ALARM_ARMED_NIGHT: SUPPORT_ALARM_ARM_NIGHT,
}
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up the Risco alarm control panel."""
coordinator = hass.data[DOMAIN][config_entry.entry_id][DATA_COORDINATOR]
code = config_entry.data[CONF_PIN]
code_arm_req = config_entry.options.get(CONF_CODE_ARM_REQUIRED, False)
code_disarm_req = config_entry.options.get(CONF_CODE_DISARM_REQUIRED, False)
options = {**DEFAULT_OPTIONS, **config_entry.options}
entities = [
RiscoAlarm(coordinator, partition_id, code, code_arm_req, code_disarm_req)
RiscoAlarm(coordinator, partition_id, config_entry.data[CONF_PIN], options)
for partition_id in coordinator.data.partitions
]
@ -53,16 +61,19 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
class RiscoAlarm(AlarmControlPanelEntity, RiscoEntity):
"""Representation of a Risco partition."""
def __init__(
self, coordinator, partition_id, code, code_arm_required, code_disarm_required
):
def __init__(self, coordinator, partition_id, code, options):
"""Init the partition."""
super().__init__(coordinator)
self._partition_id = partition_id
self._partition = self._coordinator.data.partitions[self._partition_id]
self._code = code
self._code_arm_required = code_arm_required
self._code_disarm_required = code_disarm_required
self._code_arm_required = options[CONF_CODE_ARM_REQUIRED]
self._code_disarm_required = options[CONF_CODE_DISARM_REQUIRED]
self._risco_to_ha = options[CONF_RISCO_STATES_TO_HA]
self._ha_to_risco = options[CONF_HA_STATES_TO_RISCO]
self._supported_states = 0
for state in self._ha_to_risco:
self._supported_states |= STATES_TO_SUPPORTED_FEATURES[state]
def _get_data_from_coordinator(self):
self._partition = self._coordinator.data.partitions[self._partition_id]
@ -93,19 +104,23 @@ class RiscoAlarm(AlarmControlPanelEntity, RiscoEntity):
return STATE_ALARM_TRIGGERED
if self._partition.arming:
return STATE_ALARM_ARMING
if self._partition.armed:
return STATE_ALARM_ARMED_AWAY
if self._partition.partially_armed:
return STATE_ALARM_ARMED_HOME
if self._partition.disarmed:
return STATE_ALARM_DISARMED
if self._partition.armed:
return self._risco_to_ha[RISCO_ARM]
if self._partition.partially_armed:
for group, armed in self._partition.groups.items():
if armed:
return self._risco_to_ha[group]
return self._risco_to_ha[RISCO_PARTIAL_ARM]
return None
@property
def supported_features(self):
"""Return the list of supported features."""
return SUPPORT_ALARM_ARM_HOME | SUPPORT_ALARM_ARM_AWAY
return self._supported_states
@property
def code_arm_required(self):
@ -117,32 +132,49 @@ class RiscoAlarm(AlarmControlPanelEntity, RiscoEntity):
"""Return one or more digits/characters."""
return FORMAT_NUMBER
def _validate_code(self, code, state):
def _validate_code(self, code):
"""Validate given code."""
check = code == self._code
if not check:
_LOGGER.warning("Wrong code entered for %s", state)
return check
return code == self._code
async def async_alarm_disarm(self, code=None):
"""Send disarm command."""
if self._code_disarm_required and not self._validate_code(code, "disarming"):
if self._code_disarm_required and not self._validate_code(code):
_LOGGER.warning("Wrong code entered for disarming")
return
await self._call_alarm_method("disarm")
async def async_alarm_arm_home(self, code=None):
"""Send arm home command."""
if self._code_arm_required and not self._validate_code(code, "arming home"):
return
await self._call_alarm_method("partial_arm")
await self._arm(STATE_ALARM_ARMED_HOME, code)
async def async_alarm_arm_away(self, code=None):
"""Send arm away command."""
if self._code_arm_required and not self._validate_code(code, "arming away"):
return
await self._call_alarm_method("arm")
await self._arm(STATE_ALARM_ARMED_AWAY, code)
async def _call_alarm_method(self, method):
alarm_obj = await getattr(self._risco, method)(self._partition_id)
self._partition = alarm_obj.partitions[self._partition_id]
async def async_alarm_arm_night(self, code=None):
"""Send arm night command."""
await self._arm(STATE_ALARM_ARMED_NIGHT, code)
async def async_alarm_arm_custom_bypass(self, code=None):
"""Send arm custom bypass command."""
await self._arm(STATE_ALARM_ARMED_CUSTOM_BYPASS, code)
async def _arm(self, mode, code):
if self._code_arm_required and not self._validate_code(code):
_LOGGER.warning("Wrong code entered for %s", mode)
return
risco_state = self._ha_to_risco[mode]
if not risco_state:
_LOGGER.warning("No mapping for mode %s", mode)
return
if risco_state in RISCO_GROUPS:
await self._call_alarm_method("group_arm", risco_state)
else:
await self._call_alarm_method(risco_state)
async def _call_alarm_method(self, method, *args):
alarm = await getattr(self._risco, method)(self._partition_id, *args)
self._partition = alarm.partitions[self._partition_id]
self.async_write_ha_state()

View File

@ -10,13 +10,20 @@ from homeassistant.const import (
CONF_PIN,
CONF_SCAN_INTERVAL,
CONF_USERNAME,
STATE_ALARM_ARMED_AWAY,
STATE_ALARM_ARMED_CUSTOM_BYPASS,
STATE_ALARM_ARMED_HOME,
STATE_ALARM_ARMED_NIGHT,
)
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import (
CONF_CODE_ARM_REQUIRED,
CONF_CODE_DISARM_REQUIRED,
DEFAULT_SCAN_INTERVAL,
CONF_HA_STATES_TO_RISCO,
CONF_RISCO_STATES_TO_HA,
DEFAULT_OPTIONS,
RISCO_STATES,
)
from .const import DOMAIN # pylint:disable=unused-import
@ -24,6 +31,12 @@ _LOGGER = logging.getLogger(__name__)
DATA_SCHEMA = vol.Schema({CONF_USERNAME: str, CONF_PASSWORD: str, CONF_PIN: str})
HA_STATES = [
STATE_ALARM_ARMED_AWAY,
STATE_ALARM_ARMED_HOME,
STATE_ALARM_ARMED_NIGHT,
STATE_ALARM_ARMED_CUSTOM_BYPASS,
]
async def validate_input(hass: core.HomeAssistant, data):
@ -83,22 +96,20 @@ class RiscoOptionsFlowHandler(config_entries.OptionsFlow):
def __init__(self, config_entry):
"""Initialize."""
self.config_entry = config_entry
self._data = {**DEFAULT_OPTIONS, **config_entry.options}
def _options_schema(self):
scan_interval = self.config_entry.options.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
)
code_arm_required = self.config_entry.options.get(CONF_CODE_ARM_REQUIRED, False)
code_disarm_required = self.config_entry.options.get(
CONF_CODE_DISARM_REQUIRED, False
)
return vol.Schema(
{
vol.Required(CONF_SCAN_INTERVAL, default=scan_interval): int,
vol.Required(CONF_CODE_ARM_REQUIRED, default=code_arm_required): bool,
vol.Required(
CONF_CODE_DISARM_REQUIRED, default=code_disarm_required
CONF_SCAN_INTERVAL, default=self._data[CONF_SCAN_INTERVAL]
): int,
vol.Required(
CONF_CODE_ARM_REQUIRED, default=self._data[CONF_CODE_ARM_REQUIRED]
): bool,
vol.Required(
CONF_CODE_DISARM_REQUIRED,
default=self._data[CONF_CODE_DISARM_REQUIRED],
): bool,
}
)
@ -106,6 +117,53 @@ class RiscoOptionsFlowHandler(config_entries.OptionsFlow):
async def async_step_init(self, user_input=None):
"""Manage the options."""
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
self._data = {**self._data, **user_input}
return await self.async_step_risco_to_ha()
return self.async_show_form(step_id="init", data_schema=self._options_schema())
async def async_step_risco_to_ha(self, user_input=None):
"""Map Risco states to HA states."""
if user_input is not None:
self._data[CONF_RISCO_STATES_TO_HA] = user_input
return await self.async_step_ha_to_risco()
risco_to_ha = self._data[CONF_RISCO_STATES_TO_HA]
options = vol.Schema(
{
vol.Required(risco_state, default=risco_to_ha[risco_state]): vol.In(
HA_STATES
)
for risco_state in RISCO_STATES
}
)
return self.async_show_form(step_id="risco_to_ha", data_schema=options)
async def async_step_ha_to_risco(self, user_input=None):
"""Map HA states to Risco states."""
if user_input is not None:
self._data[CONF_HA_STATES_TO_RISCO] = user_input
return self.async_create_entry(title="", data=self._data)
options = {}
risco_to_ha = self._data[CONF_RISCO_STATES_TO_HA]
# we iterate over HA_STATES, instead of set(self._risco_to_ha.values())
# to ensure a consistent order
for ha_state in HA_STATES:
if ha_state not in risco_to_ha.values():
continue
values = [
risco_state
for risco_state in RISCO_STATES
if risco_to_ha[risco_state] == ha_state
]
current = self._data[CONF_HA_STATES_TO_RISCO].get(ha_state)
if current not in values:
current = values[0]
options[vol.Required(ha_state, default=current)] = vol.In(values)
return self.async_show_form(
step_id="ha_to_risco", data_schema=vol.Schema(options)
)

View File

@ -1,5 +1,11 @@
"""Constants for the Risco integration."""
from homeassistant.const import (
CONF_SCAN_INTERVAL,
STATE_ALARM_ARMED_AWAY,
STATE_ALARM_ARMED_HOME,
)
DOMAIN = "risco"
DATA_COORDINATOR = "risco"
@ -8,3 +14,30 @@ DEFAULT_SCAN_INTERVAL = 30
CONF_CODE_ARM_REQUIRED = "code_arm_required"
CONF_CODE_DISARM_REQUIRED = "code_disarm_required"
CONF_RISCO_STATES_TO_HA = "risco_states_to_ha"
CONF_HA_STATES_TO_RISCO = "ha_states_to_risco"
RISCO_GROUPS = ["A", "B", "C", "D"]
RISCO_ARM = "arm"
RISCO_PARTIAL_ARM = "partial_arm"
RISCO_STATES = [RISCO_ARM, RISCO_PARTIAL_ARM, *RISCO_GROUPS]
DEFAULT_RISCO_GROUPS_TO_HA = {group: STATE_ALARM_ARMED_HOME for group in RISCO_GROUPS}
DEFAULT_RISCO_STATES_TO_HA = {
RISCO_ARM: STATE_ALARM_ARMED_AWAY,
RISCO_PARTIAL_ARM: STATE_ALARM_ARMED_HOME,
**DEFAULT_RISCO_GROUPS_TO_HA,
}
DEFAULT_HA_STATES_TO_RISCO = {
STATE_ALARM_ARMED_AWAY: RISCO_ARM,
STATE_ALARM_ARMED_HOME: RISCO_PARTIAL_ARM,
}
DEFAULT_OPTIONS = {
CONF_SCAN_INTERVAL: DEFAULT_SCAN_INTERVAL,
CONF_CODE_ARM_REQUIRED: False,
CONF_CODE_DISARM_REQUIRED: False,
CONF_RISCO_STATES_TO_HA: DEFAULT_RISCO_STATES_TO_HA,
CONF_HA_STATES_TO_RISCO: DEFAULT_HA_STATES_TO_RISCO,
}

View File

@ -27,6 +27,28 @@
"code_arm_required": "Require pin code to arm",
"code_disarm_required": "Require pin code to disarm"
}
},
"risco_to_ha": {
"title": "Map Risco states to Home Assistant states",
"description": "Select what state your Home Assistant alarm will report for every state reported by Risco",
"data": {
"arm": "Armed (AWAY)",
"partial_arm": "Partially Armed (STAY)",
"A": "Group A",
"B": "Group B",
"C": "Group C",
"D": "Group D"
}
},
"ha_to_risco": {
"title": "Map Home Assistant states to Risco states",
"description": "Select what state to set your Risco alarm to when arming the Home Assistant alarm",
"data": {
"armed_away": "Armed Away",
"armed_home": "Armed Home",
"armed_night": "Armed Night",
"armed_custom_bypass": "Armed Custom Bypass"
}
}
}
}

View File

@ -2,6 +2,12 @@
import pytest
from homeassistant.components.alarm_control_panel import DOMAIN as ALARM_DOMAIN
from homeassistant.components.alarm_control_panel.const import (
SUPPORT_ALARM_ARM_AWAY,
SUPPORT_ALARM_ARM_CUSTOM_BYPASS,
SUPPORT_ALARM_ARM_HOME,
SUPPORT_ALARM_ARM_NIGHT,
)
from homeassistant.components.risco import CannotConnectError, UnauthorizedError
from homeassistant.components.risco.const import DOMAIN
from homeassistant.const import (
@ -9,10 +15,14 @@ from homeassistant.const import (
CONF_PIN,
CONF_USERNAME,
SERVICE_ALARM_ARM_AWAY,
SERVICE_ALARM_ARM_CUSTOM_BYPASS,
SERVICE_ALARM_ARM_HOME,
SERVICE_ALARM_ARM_NIGHT,
SERVICE_ALARM_DISARM,
STATE_ALARM_ARMED_AWAY,
STATE_ALARM_ARMED_CUSTOM_BYPASS,
STATE_ALARM_ARMED_HOME,
STATE_ALARM_ARMED_NIGHT,
STATE_ALARM_ARMING,
STATE_ALARM_DISARMED,
STATE_ALARM_TRIGGERED,
@ -34,6 +44,40 @@ FIRST_ENTITY_ID = "alarm_control_panel.risco_test_site_name_partition_0"
SECOND_ENTITY_ID = "alarm_control_panel.risco_test_site_name_partition_1"
CODES_REQUIRED_OPTIONS = {"code_arm_required": True, "code_disarm_required": True}
TEST_RISCO_TO_HA = {
"arm": STATE_ALARM_ARMED_AWAY,
"partial_arm": STATE_ALARM_ARMED_HOME,
"A": STATE_ALARM_ARMED_HOME,
"B": STATE_ALARM_ARMED_HOME,
"C": STATE_ALARM_ARMED_NIGHT,
"D": STATE_ALARM_ARMED_NIGHT,
}
TEST_FULL_RISCO_TO_HA = {
**TEST_RISCO_TO_HA,
"D": STATE_ALARM_ARMED_CUSTOM_BYPASS,
}
TEST_HA_TO_RISCO = {
STATE_ALARM_ARMED_AWAY: "arm",
STATE_ALARM_ARMED_HOME: "partial_arm",
STATE_ALARM_ARMED_NIGHT: "C",
}
TEST_FULL_HA_TO_RISCO = {
**TEST_HA_TO_RISCO,
STATE_ALARM_ARMED_CUSTOM_BYPASS: "D",
}
CUSTOM_MAPPING_OPTIONS = {
"risco_states_to_ha": TEST_RISCO_TO_HA,
"ha_states_to_risco": TEST_HA_TO_RISCO,
}
FULL_CUSTOM_MAPPING = {
"risco_states_to_ha": TEST_FULL_RISCO_TO_HA,
"ha_states_to_risco": TEST_FULL_HA_TO_RISCO,
}
EXPECTED_FEATURES = (
SUPPORT_ALARM_ARM_AWAY | SUPPORT_ALARM_ARM_HOME | SUPPORT_ALARM_ARM_NIGHT
)
def _partition_mock():
@ -152,55 +196,68 @@ async def _check_state(hass, alarm, property, state, entity_id, partition_id):
async def test_states(hass, two_part_alarm):
"""Test the various alarm states."""
await _setup_risco(hass)
await _setup_risco(hass, CUSTOM_MAPPING_OPTIONS)
assert hass.states.get(FIRST_ENTITY_ID).state == STATE_UNKNOWN
await _check_state(
hass, two_part_alarm, "triggered", STATE_ALARM_TRIGGERED, FIRST_ENTITY_ID, 0
)
await _check_state(
hass, two_part_alarm, "triggered", STATE_ALARM_TRIGGERED, SECOND_ENTITY_ID, 1
)
await _check_state(
hass, two_part_alarm, "arming", STATE_ALARM_ARMING, FIRST_ENTITY_ID, 0
)
await _check_state(
hass, two_part_alarm, "arming", STATE_ALARM_ARMING, SECOND_ENTITY_ID, 1
)
await _check_state(
hass, two_part_alarm, "armed", STATE_ALARM_ARMED_AWAY, FIRST_ENTITY_ID, 0
)
await _check_state(
hass, two_part_alarm, "armed", STATE_ALARM_ARMED_AWAY, SECOND_ENTITY_ID, 1
)
await _check_state(
hass,
two_part_alarm,
"partially_armed",
STATE_ALARM_ARMED_HOME,
FIRST_ENTITY_ID,
0,
)
await _check_state(
hass,
two_part_alarm,
"partially_armed",
STATE_ALARM_ARMED_HOME,
SECOND_ENTITY_ID,
1,
)
await _check_state(
hass, two_part_alarm, "disarmed", STATE_ALARM_DISARMED, FIRST_ENTITY_ID, 0
)
await _check_state(
hass, two_part_alarm, "disarmed", STATE_ALARM_DISARMED, SECOND_ENTITY_ID, 1
)
for partition_id, entity_id in {0: FIRST_ENTITY_ID, 1: SECOND_ENTITY_ID}.items():
await _check_state(
hass,
two_part_alarm,
"triggered",
STATE_ALARM_TRIGGERED,
entity_id,
partition_id,
)
await _check_state(
hass, two_part_alarm, "arming", STATE_ALARM_ARMING, entity_id, partition_id
)
await _check_state(
hass,
two_part_alarm,
"armed",
STATE_ALARM_ARMED_AWAY,
entity_id,
partition_id,
)
await _check_state(
hass,
two_part_alarm,
"partially_armed",
STATE_ALARM_ARMED_HOME,
entity_id,
partition_id,
)
await _check_state(
hass,
two_part_alarm,
"disarmed",
STATE_ALARM_DISARMED,
entity_id,
partition_id,
)
groups = {"A": False, "B": False, "C": True, "D": False}
with patch.object(
two_part_alarm.partitions[partition_id],
"groups",
new_callable=PropertyMock(return_value=groups),
):
await _check_state(
hass,
two_part_alarm,
"partially_armed",
STATE_ALARM_ARMED_NIGHT,
entity_id,
partition_id,
)
async def _test_service_call(hass, service, method, entity_id, partition_id, **kwargs):
async def _test_service_call(
hass, service, method, entity_id, partition_id, *args, **kwargs
):
with patch(f"homeassistant.components.risco.RiscoAPI.{method}") as set_mock:
await _call_alarm_service(hass, service, entity_id, **kwargs)
set_mock.assert_awaited_once_with(partition_id)
set_mock.assert_awaited_once_with(partition_id, *args)
async def _test_no_service_call(
@ -219,9 +276,13 @@ async def _call_alarm_service(hass, service, entity_id, **kwargs):
)
async def test_sets(hass, two_part_alarm):
"""Test settings the various modes."""
await _setup_risco(hass)
async def test_sets_custom_mapping(hass, two_part_alarm):
"""Test settings the various modes when mapping some states."""
await _setup_risco(hass, CUSTOM_MAPPING_OPTIONS)
registry = await hass.helpers.entity_registry.async_get_registry()
entity = registry.async_get(FIRST_ENTITY_ID)
assert entity.supported_features == EXPECTED_FEATURES
await _test_service_call(hass, SERVICE_ALARM_DISARM, "disarm", FIRST_ENTITY_ID, 0)
await _test_service_call(hass, SERVICE_ALARM_DISARM, "disarm", SECOND_ENTITY_ID, 1)
@ -233,11 +294,51 @@ async def test_sets(hass, two_part_alarm):
await _test_service_call(
hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_ENTITY_ID, 1
)
await _test_service_call(
hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", FIRST_ENTITY_ID, 0, "C"
)
await _test_service_call(
hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", SECOND_ENTITY_ID, 1, "C"
)
async def test_sets_full_custom_mapping(hass, two_part_alarm):
"""Test settings the various modes when mapping all states."""
await _setup_risco(hass, FULL_CUSTOM_MAPPING)
registry = await hass.helpers.entity_registry.async_get_registry()
entity = registry.async_get(FIRST_ENTITY_ID)
assert (
entity.supported_features == EXPECTED_FEATURES | SUPPORT_ALARM_ARM_CUSTOM_BYPASS
)
await _test_service_call(hass, SERVICE_ALARM_DISARM, "disarm", FIRST_ENTITY_ID, 0)
await _test_service_call(hass, SERVICE_ALARM_DISARM, "disarm", SECOND_ENTITY_ID, 1)
await _test_service_call(hass, SERVICE_ALARM_ARM_AWAY, "arm", FIRST_ENTITY_ID, 0)
await _test_service_call(hass, SERVICE_ALARM_ARM_AWAY, "arm", SECOND_ENTITY_ID, 1)
await _test_service_call(
hass, SERVICE_ALARM_ARM_HOME, "partial_arm", FIRST_ENTITY_ID, 0
)
await _test_service_call(
hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_ENTITY_ID, 1
)
await _test_service_call(
hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", FIRST_ENTITY_ID, 0, "C"
)
await _test_service_call(
hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", SECOND_ENTITY_ID, 1, "C"
)
await _test_service_call(
hass, SERVICE_ALARM_ARM_CUSTOM_BYPASS, "group_arm", FIRST_ENTITY_ID, 0, "D"
)
await _test_service_call(
hass, SERVICE_ALARM_ARM_CUSTOM_BYPASS, "group_arm", SECOND_ENTITY_ID, 1, "D"
)
async def test_sets_with_correct_code(hass, two_part_alarm):
"""Test settings the various modes when code is required."""
await _setup_risco(hass, CODES_REQUIRED_OPTIONS)
await _setup_risco(hass, {**CUSTOM_MAPPING_OPTIONS, **CODES_REQUIRED_OPTIONS})
code = {"code": 1234}
await _test_service_call(
@ -258,11 +359,28 @@ async def test_sets_with_correct_code(hass, two_part_alarm):
await _test_service_call(
hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_ENTITY_ID, 1, **code
)
await _test_service_call(
hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", FIRST_ENTITY_ID, 0, "C", **code
)
await _test_service_call(
hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", SECOND_ENTITY_ID, 1, "C", **code
)
await _test_no_service_call(
hass, SERVICE_ALARM_ARM_CUSTOM_BYPASS, "partial_arm", FIRST_ENTITY_ID, 0, **code
)
await _test_no_service_call(
hass,
SERVICE_ALARM_ARM_CUSTOM_BYPASS,
"partial_arm",
SECOND_ENTITY_ID,
1,
**code,
)
async def test_sets_with_incorrect_code(hass, two_part_alarm):
"""Test settings the various modes when code is required and incorrect."""
await _setup_risco(hass, CODES_REQUIRED_OPTIONS)
await _setup_risco(hass, {**CUSTOM_MAPPING_OPTIONS, **CODES_REQUIRED_OPTIONS})
code = {"code": 4321}
await _test_no_service_call(
@ -283,3 +401,20 @@ async def test_sets_with_incorrect_code(hass, two_part_alarm):
await _test_no_service_call(
hass, SERVICE_ALARM_ARM_HOME, "partial_arm", SECOND_ENTITY_ID, 1, **code
)
await _test_no_service_call(
hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", FIRST_ENTITY_ID, 0, **code
)
await _test_no_service_call(
hass, SERVICE_ALARM_ARM_NIGHT, "group_arm", SECOND_ENTITY_ID, 1, **code
)
await _test_no_service_call(
hass, SERVICE_ALARM_ARM_CUSTOM_BYPASS, "partial_arm", FIRST_ENTITY_ID, 0, **code
)
await _test_no_service_call(
hass,
SERVICE_ALARM_ARM_CUSTOM_BYPASS,
"partial_arm",
SECOND_ENTITY_ID,
1,
**code,
)

View File

@ -1,4 +1,7 @@
"""Test the Risco config flow."""
import pytest
import voluptuous as vol
from homeassistant import config_entries, data_entry_flow
from homeassistant.components.risco.config_flow import (
CannotConnectError,
@ -16,6 +19,27 @@ TEST_DATA = {
"pin": "1234",
}
TEST_RISCO_TO_HA = {
"arm": "armed_away",
"partial_arm": "armed_home",
"A": "armed_home",
"B": "armed_home",
"C": "armed_night",
"D": "armed_night",
}
TEST_HA_TO_RISCO = {
"armed_away": "arm",
"armed_home": "partial_arm",
"armed_night": "C",
}
TEST_OPTIONS = {
"scan_interval": 10,
"code_arm_required": True,
"code_disarm_required": True,
}
async def test_form(hass):
"""Test we get the form."""
@ -133,12 +157,6 @@ async def test_form_already_exists(hass):
async def test_options_flow(hass):
"""Test options flow."""
conf = {
"scan_interval": 10,
"code_arm_required": True,
"code_disarm_required": True,
}
entry = MockConfigEntry(
domain=DOMAIN,
unique_id=TEST_DATA["username"],
@ -147,16 +165,71 @@ async def test_options_flow(hass):
entry.add_to_hass(hass)
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input=TEST_OPTIONS,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "risco_to_ha"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input=TEST_RISCO_TO_HA,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "ha_to_risco"
with patch("homeassistant.components.risco.async_setup_entry", return_value=True):
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input=conf,
user_input=TEST_HA_TO_RISCO,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert entry.options == conf
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert entry.options == {
**TEST_OPTIONS,
"risco_states_to_ha": TEST_RISCO_TO_HA,
"ha_states_to_risco": TEST_HA_TO_RISCO,
}
async def test_ha_to_risco_schema(hass):
"""Test that the schema for the ha-to-risco mapping step is generated properly."""
entry = MockConfigEntry(
domain=DOMAIN,
unique_id=TEST_DATA["username"],
data=TEST_DATA,
)
entry.add_to_hass(hass)
result = await hass.config_entries.options.async_init(entry.entry_id)
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input=TEST_OPTIONS,
)
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input=TEST_RISCO_TO_HA,
)
# Test an HA state that isn't used
with pytest.raises(vol.error.MultipleInvalid):
await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={**TEST_HA_TO_RISCO, "armed_custom_bypass": "D"},
)
# Test a combo that can't be selected
with pytest.raises(vol.error.MultipleInvalid):
await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={**TEST_HA_TO_RISCO, "armed_night": "A"},
)