Exclude own entity from group entity selector (#68782)

This commit is contained in:
Erik Montnemery 2022-03-30 18:07:47 +02:00 committed by GitHub
parent 463cb8679f
commit 2c92d19058
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 111 additions and 35 deletions

View File

@ -2,6 +2,7 @@
from __future__ import annotations
from collections.abc import Callable, Mapping
from functools import partial
from typing import Any, cast
import voluptuous as vol
@ -13,6 +14,8 @@ from homeassistant.helpers.helper_config_entry_flow import (
HelperConfigFlowHandler,
HelperFlowFormStep,
HelperFlowMenuStep,
HelperOptionsFlowHandler,
entity_selector_without_own_entities,
)
from . import DOMAIN
@ -20,12 +23,17 @@ from .binary_sensor import CONF_ALL
from .const import CONF_HIDE_MEMBERS
def basic_group_options_schema(domain: str) -> vol.Schema:
def basic_group_options_schema(
domain: str,
handler: HelperConfigFlowHandler | HelperOptionsFlowHandler,
options: dict[str, Any],
) -> vol.Schema:
"""Generate options schema."""
handler = cast(HelperOptionsFlowHandler, handler)
return vol.Schema(
{
vol.Required(CONF_ENTITIES): selector.selector(
{"entity": {"domain": domain, "multiple": True}}
vol.Required(CONF_ENTITIES): entity_selector_without_own_entities(
handler, {"domain": domain, "multiple": True}
),
vol.Required(CONF_HIDE_MEMBERS, default=False): selector.selector(
{"boolean": {}}
@ -36,36 +44,52 @@ def basic_group_options_schema(domain: str) -> vol.Schema:
def basic_group_config_schema(domain: str) -> vol.Schema:
"""Generate config schema."""
return vol.Schema({vol.Required("name"): selector.selector({"text": {}})}).extend(
basic_group_options_schema(domain).schema
return vol.Schema(
{
vol.Required("name"): selector.selector({"text": {}}),
vol.Required(CONF_ENTITIES): selector.selector(
{"entity": {"domain": domain, "multiple": True}}
),
vol.Required(CONF_HIDE_MEMBERS, default=False): selector.selector(
{"boolean": {}}
),
}
)
BINARY_SENSOR_OPTIONS_SCHEMA = basic_group_options_schema("binary_sensor").extend(
def binary_sensor_options_schema(
handler: HelperConfigFlowHandler | HelperOptionsFlowHandler,
options: dict[str, Any],
) -> vol.Schema:
"""Generate options schema."""
return basic_group_options_schema("binary_sensor", handler, options).extend(
{
vol.Required(CONF_ALL, default=False): selector.selector({"boolean": {}}),
}
)
BINARY_SENSOR_CONFIG_SCHEMA = basic_group_config_schema("binary_sensor").extend(
{
vol.Required(CONF_ALL, default=False): selector.selector({"boolean": {}}),
}
)
LIGHT_OPTIONS_SCHEMA = basic_group_options_schema("light").extend(
{
vol.Required(
CONF_ALL, default=False, description={"advanced": True}
): selector.selector({"boolean": {}}),
}
)
SWITCH_OPTIONS_SCHEMA = basic_group_options_schema("switch").extend(
{
vol.Required(
CONF_ALL, default=False, description={"advanced": True}
): selector.selector({"boolean": {}}),
}
)
def light_switch_options_schema(
domain: str,
handler: HelperConfigFlowHandler | HelperOptionsFlowHandler,
options: dict[str, Any],
) -> vol.Schema:
"""Generate options schema."""
return basic_group_options_schema(domain, handler, options).extend(
{
vol.Required(
CONF_ALL, default=False, description={"advanced": True}
): selector.selector({"boolean": {}}),
}
)
BINARY_SENSOR_CONFIG_SCHEMA = vol.Schema(
{vol.Required("name"): selector.selector({"text": {}})}
).extend(BINARY_SENSOR_OPTIONS_SCHEMA.schema)
GROUP_TYPES = [
"binary_sensor",
@ -121,13 +145,15 @@ CONFIG_FLOW: dict[str, HelperFlowFormStep | HelperFlowMenuStep] = {
OPTIONS_FLOW: dict[str, HelperFlowFormStep | HelperFlowMenuStep] = {
"init": HelperFlowFormStep(None, next_step=choose_options_step),
"binary_sensor": HelperFlowFormStep(BINARY_SENSOR_OPTIONS_SCHEMA),
"cover": HelperFlowFormStep(basic_group_options_schema("cover")),
"fan": HelperFlowFormStep(basic_group_options_schema("fan")),
"light": HelperFlowFormStep(LIGHT_OPTIONS_SCHEMA),
"lock": HelperFlowFormStep(basic_group_options_schema("lock")),
"media_player": HelperFlowFormStep(basic_group_options_schema("media_player")),
"switch": HelperFlowFormStep(SWITCH_OPTIONS_SCHEMA),
"binary_sensor": HelperFlowFormStep(binary_sensor_options_schema),
"cover": HelperFlowFormStep(partial(basic_group_options_schema, "cover")),
"fan": HelperFlowFormStep(partial(basic_group_options_schema, "fan")),
"light": HelperFlowFormStep(partial(light_switch_options_schema, "light")),
"lock": HelperFlowFormStep(partial(basic_group_options_schema, "lock")),
"media_player": HelperFlowFormStep(
partial(basic_group_options_schema, "media_player")
),
"switch": HelperFlowFormStep(partial(light_switch_options_schema, "switch")),
}

View File

@ -14,7 +14,7 @@ from homeassistant import config_entries
from homeassistant.core import HomeAssistant, callback, split_entity_id
from homeassistant.data_entry_flow import FlowResult, UnknownHandler
from . import entity_registry as er
from . import entity_registry as er, selector
class HelperFlowError(Exception):
@ -27,7 +27,10 @@ class HelperFlowFormStep:
# Optional schema for requesting and validating user input. If schema validation
# fails, the step will be retried. If the schema is None, no user input is requested.
schema: vol.Schema | None
schema: vol.Schema | Callable[
[HelperConfigFlowHandler | HelperOptionsFlowHandler, dict[str, Any]],
vol.Schema | None,
] | None
# Optional function to validate user input.
# The validate_user_input function is called if the schema validates successfully.
@ -42,6 +45,20 @@ class HelperFlowFormStep:
# If next_step returns None, the flow is ended with RESULT_TYPE_CREATE_ENTRY.
next_step: Callable[[dict[str, Any]], str | None] = lambda _: None
# Optional function to allow amending a form schema.
# The update_form_schema function is called before async_show_form is called. The
# update_form_schema function is passed the handler, which is either an instance of
# HelperConfigFlowHandler or HelperOptionsFlowHandler, the schema, and the union of
# config entry options and user input from previous steps.
update_form_schema: Callable[
[
HelperConfigFlowHandler | HelperOptionsFlowHandler,
vol.Schema,
dict[str, Any],
],
vol.Schema,
] = lambda _handler, schema, _options: schema
@dataclass
class HelperFlowMenuStep:
@ -73,6 +90,15 @@ class HelperCommonFlowHandler:
return await self._async_form_step(step_id, user_input)
return await self._async_menu_step(step_id, user_input)
def _get_schema(
self, form_step: HelperFlowFormStep, options: dict[str, Any]
) -> vol.Schema | None:
if form_step.schema is None:
return None
if isinstance(form_step.schema, vol.Schema):
return form_step.schema
return form_step.schema(self._handler, options)
async def _async_form_step(
self, step_id: str, user_input: dict[str, Any] | None = None
) -> FlowResult:
@ -81,7 +107,7 @@ class HelperCommonFlowHandler:
if (
user_input is not None
and (data_schema := form_step.schema)
and (data_schema := self._get_schema(form_step, self._options))
and data_schema.schema
and not self._handler.show_advanced_options
):
@ -133,7 +159,10 @@ class HelperCommonFlowHandler:
options = dict(self._options)
if user_input:
options.update(user_input)
if (data_schema := form_step.schema) and data_schema.schema:
if (
data_schema := self._get_schema(form_step, self._options)
) and data_schema.schema:
# Make a copy of the schema with suggested values set to saved options
schema = {}
for key, val in data_schema.schema.items():
@ -282,7 +311,7 @@ class HelperOptionsFlowHandler(config_entries.OptionsFlow):
) -> None:
"""Initialize options flow."""
self._common_handler = HelperCommonFlowHandler(self, options_flow, config_entry)
self._config_entry = config_entry
self.config_entry = config_entry
self._async_options_flow_finished = async_options_flow_finished
for step in options_flow:
@ -337,3 +366,21 @@ def wrapped_entity_config_entry_title(
if state:
return state.name or object_id
return object_id
@callback
def entity_selector_without_own_entities(
handler: HelperOptionsFlowHandler,
entity_selector_config: dict[str, Any],
) -> vol.Schema:
"""Return an entity selector which excludes own entities."""
entity_registry = er.async_get(handler.hass)
entities = er.async_entries_for_config_entry(
entity_registry,
handler.config_entry.entry_id, # pylint: disable=protected-access
)
entity_ids = [ent.entity_id for ent in entities]
return selector.selector(
{"entity": {**entity_selector_config, "exclude_entities": entity_ids}}
)

View File

@ -224,6 +224,9 @@ async def test_options(
assert result["step_id"] == group_type
assert get_suggested(result["data_schema"].schema, "entities") == members1
assert "name" not in result["data_schema"].schema
assert result["data_schema"].schema["entities"].config["exclude_entities"] == [
f"{group_type}.bed_room"
]
result = await hass.config_entries.options.async_configure(
result["flow_id"],