diff --git a/homeassistant/components/homekit/config_flow.py b/homeassistant/components/homekit/config_flow.py index a6984ae2121..d7c8ea65e2d 100644 --- a/homeassistant/components/homekit/config_flow.py +++ b/homeassistant/components/homekit/config_flow.py @@ -7,7 +7,7 @@ from operator import itemgetter import random import re import string -from typing import Any +from typing import Any, Final, TypedDict import voluptuous as vol @@ -34,12 +34,6 @@ from homeassistant.helpers import ( device_registry as dr, entity_registry as er, ) -from homeassistant.helpers.entityfilter import ( - CONF_EXCLUDE_DOMAINS, - CONF_EXCLUDE_ENTITIES, - CONF_INCLUDE_DOMAINS, - CONF_INCLUDE_ENTITIES, -) from homeassistant.loader import async_get_integrations from .const import ( @@ -69,13 +63,13 @@ MODE_EXCLUDE = "exclude" INCLUDE_EXCLUDE_MODES = [MODE_EXCLUDE, MODE_INCLUDE] -DOMAINS_NEED_ACCESSORY_MODE = [ +DOMAINS_NEED_ACCESSORY_MODE = { CAMERA_DOMAIN, LOCK_DOMAIN, MEDIA_PLAYER_DOMAIN, REMOTE_DOMAIN, -] -NEVER_BRIDGED_DOMAINS = [CAMERA_DOMAIN] +} +NEVER_BRIDGED_DOMAINS = {CAMERA_DOMAIN} CAMERA_ENTITY_PREFIX = f"{CAMERA_DOMAIN}." @@ -124,12 +118,34 @@ DEFAULT_DOMAINS = [ "water_heater", ] -_EMPTY_ENTITY_FILTER: dict[str, list[str]] = { - CONF_INCLUDE_DOMAINS: [], - CONF_EXCLUDE_DOMAINS: [], - CONF_INCLUDE_ENTITIES: [], - CONF_EXCLUDE_ENTITIES: [], -} +CONF_INCLUDE_DOMAINS: Final = "include_domains" +CONF_INCLUDE_ENTITIES: Final = "include_entities" +CONF_EXCLUDE_DOMAINS: Final = "exclude_domains" +CONF_EXCLUDE_ENTITIES: Final = "exclude_entities" + + +class EntityFilterDict(TypedDict, total=False): + """Entity filter dict.""" + + include_domains: list[str] + include_entities: list[str] + exclude_domains: list[str] + exclude_entities: list[str] + + +def _make_entity_filter( + include_domains: list[str] | None = None, + include_entities: list[str] | None = None, + exclude_domains: list[str] | None = None, + exclude_entities: list[str] | None = None, +) -> EntityFilterDict: + """Create a filter dict.""" + return EntityFilterDict( + include_domains=include_domains or [], + include_entities=include_entities or [], + exclude_domains=exclude_domains or [], + exclude_entities=exclude_entities or [], + ) async def _async_domain_names(hass: HomeAssistant, domains: list[str]) -> str: @@ -141,19 +157,18 @@ async def _async_domain_names(hass: HomeAssistant, domains: list[str]) -> str: @callback -def _async_build_entites_filter( +def _async_build_entities_filter( domains: list[str], entities: list[str] -) -> dict[str, Any]: +) -> EntityFilterDict: """Build an entities filter from domains and entities.""" - entity_filter = deepcopy(_EMPTY_ENTITY_FILTER) - entity_filter[CONF_INCLUDE_ENTITIES] = entities # Include all of the domain if there are no entities # explicitly included as the user selected the domain - domains_with_entities_selected = _domains_set_from_entities(entities) - entity_filter[CONF_INCLUDE_DOMAINS] = [ - domain for domain in domains if domain not in domains_with_entities_selected - ] - return entity_filter + return _make_entity_filter( + include_domains=sorted( + set(domains).difference(_domains_set_from_entities(entities)) + ), + include_entities=entities, + ) def _async_cameras_from_entities(entities: list[str]) -> dict[str, str]: @@ -190,13 +205,15 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): ) -> FlowResult: """Choose specific domains in bridge mode.""" if user_input is not None: - entity_filter = deepcopy(_EMPTY_ENTITY_FILTER) - entity_filter[CONF_INCLUDE_DOMAINS] = user_input[CONF_INCLUDE_DOMAINS] - self.hk_data[CONF_FILTER] = entity_filter + self.hk_data[CONF_FILTER] = _make_entity_filter( + include_domains=user_input[CONF_INCLUDE_DOMAINS] + ) return await self.async_step_pairing() self.hk_data[CONF_HOMEKIT_MODE] = HOMEKIT_MODE_BRIDGE - default_domains = [] if self._async_current_names() else DEFAULT_DOMAINS + default_domains = ( + [] if self._async_current_entries(include_ignore=False) else DEFAULT_DOMAINS + ) name_to_type_map = await _async_name_to_type_map(self.hass) return self.async_show_form( step_id="user", @@ -213,24 +230,28 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): self, user_input: dict[str, Any] | None = None ) -> FlowResult: """Pairing instructions.""" + hk_data = self.hk_data + if user_input is not None: port = async_find_next_available_port(self.hass, DEFAULT_CONFIG_FLOW_PORT) await self._async_add_entries_for_accessory_mode_entities(port) - self.hk_data[CONF_PORT] = port - include_domains_filter = self.hk_data[CONF_FILTER][CONF_INCLUDE_DOMAINS] - for domain in NEVER_BRIDGED_DOMAINS: - if domain in include_domains_filter: - include_domains_filter.remove(domain) + hk_data[CONF_PORT] = port + conf_filter: EntityFilterDict = hk_data[CONF_FILTER] + conf_filter[CONF_INCLUDE_DOMAINS] = [ + domain + for domain in conf_filter[CONF_INCLUDE_DOMAINS] + if domain not in NEVER_BRIDGED_DOMAINS + ] return self.async_create_entry( - title=f"{self.hk_data[CONF_NAME]}:{self.hk_data[CONF_PORT]}", - data=self.hk_data, + title=f"{hk_data[CONF_NAME]}:{hk_data[CONF_PORT]}", + data=hk_data, ) - self.hk_data[CONF_NAME] = self._async_available_name(SHORT_BRIDGE_NAME) - self.hk_data[CONF_EXCLUDE_ACCESSORY_MODE] = True + hk_data[CONF_NAME] = self._async_available_name(SHORT_BRIDGE_NAME) + hk_data[CONF_EXCLUDE_ACCESSORY_MODE] = True return self.async_show_form( step_id="pairing", - description_placeholders={CONF_NAME: self.hk_data[CONF_NAME]}, + description_placeholders={CONF_NAME: hk_data[CONF_NAME]}, ) async def _async_add_entries_for_accessory_mode_entities( @@ -265,14 +286,12 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): state = self.hass.states.get(entity_id) assert state is not None name = state.attributes.get(ATTR_FRIENDLY_NAME) or state.entity_id - entity_filter = _EMPTY_ENTITY_FILTER.copy() - entity_filter[CONF_INCLUDE_ENTITIES] = [entity_id] entry_data = { CONF_PORT: port, CONF_NAME: self._async_available_name(name), CONF_HOMEKIT_MODE: HOMEKIT_MODE_ACCESSORY, - CONF_FILTER: entity_filter, + CONF_FILTER: _make_entity_filter(include_entities=[entity_id]), } if entity_id.startswith(CAMERA_ENTITY_PREFIX): entry_data[CONF_ENTITY_CONFIG] = { @@ -360,26 +379,19 @@ class OptionsFlowHandler(config_entries.OptionsFlow): self, user_input: dict[str, Any] | None = None ) -> FlowResult: """Choose advanced options.""" - if ( - not self.show_advanced_options - or user_input is not None - or self.hk_options[CONF_HOMEKIT_MODE] != HOMEKIT_MODE_BRIDGE - ): + hk_options = self.hk_options + show_advanced_options = self.show_advanced_options + bridge_mode = hk_options[CONF_HOMEKIT_MODE] == HOMEKIT_MODE_BRIDGE + + if not show_advanced_options or user_input is not None or not bridge_mode: if user_input: - self.hk_options.update(user_input) - if ( - self.show_advanced_options - and self.hk_options[CONF_HOMEKIT_MODE] == HOMEKIT_MODE_BRIDGE - ): - self.hk_options[CONF_DEVICES] = user_input[CONF_DEVICES] - - for key in (CONF_DOMAINS, CONF_ENTITIES): - if key in self.hk_options: - del self.hk_options[key] - - if CONF_INCLUDE_EXCLUDE_MODE in self.hk_options: - del self.hk_options[CONF_INCLUDE_EXCLUDE_MODE] + hk_options.update(user_input) + if show_advanced_options and bridge_mode: + hk_options[CONF_DEVICES] = user_input[CONF_DEVICES] + hk_options.pop(CONF_DOMAINS, None) + hk_options.pop(CONF_ENTITIES, None) + hk_options.pop(CONF_INCLUDE_EXCLUDE_MODE, None) return self.async_create_entry(title="", data=self.hk_options) all_supported_devices = await _async_get_supported_devices(self.hass) @@ -404,35 +416,37 @@ class OptionsFlowHandler(config_entries.OptionsFlow): self, user_input: dict[str, Any] | None = None ) -> FlowResult: """Choose camera config.""" + hk_options = self.hk_options + all_entity_config: dict[str, dict[str, Any]] + if user_input is not None: - entity_config = self.hk_options[CONF_ENTITY_CONFIG] + all_entity_config = hk_options[CONF_ENTITY_CONFIG] for entity_id in self.included_cameras: + entity_config = all_entity_config.setdefault(entity_id, {}) + if entity_id in user_input[CONF_CAMERA_COPY]: - entity_config.setdefault(entity_id, {})[ - CONF_VIDEO_CODEC - ] = VIDEO_CODEC_COPY - elif ( - entity_id in entity_config - and CONF_VIDEO_CODEC in entity_config[entity_id] - ): - del entity_config[entity_id][CONF_VIDEO_CODEC] + entity_config[CONF_VIDEO_CODEC] = VIDEO_CODEC_COPY + elif CONF_VIDEO_CODEC in entity_config: + del entity_config[CONF_VIDEO_CODEC] + if entity_id in user_input[CONF_CAMERA_AUDIO]: - entity_config.setdefault(entity_id, {})[CONF_SUPPORT_AUDIO] = True - elif ( - entity_id in entity_config - and CONF_SUPPORT_AUDIO in entity_config[entity_id] - ): - del entity_config[entity_id][CONF_SUPPORT_AUDIO] + entity_config[CONF_SUPPORT_AUDIO] = True + elif CONF_SUPPORT_AUDIO in entity_config: + del entity_config[CONF_SUPPORT_AUDIO] + + if not entity_config: + all_entity_config.pop(entity_id) + return await self.async_step_advanced() cameras_with_audio = [] cameras_with_copy = [] - entity_config = self.hk_options.setdefault(CONF_ENTITY_CONFIG, {}) + all_entity_config = hk_options.setdefault(CONF_ENTITY_CONFIG, {}) for entity in self.included_cameras: - hk_entity_config = entity_config.get(entity, {}) - if hk_entity_config.get(CONF_VIDEO_CODEC) == VIDEO_CODEC_COPY: + entity_config = all_entity_config.get(entity, {}) + if entity_config.get(CONF_VIDEO_CODEC) == VIDEO_CODEC_COPY: cameras_with_copy.append(entity) - if hk_entity_config.get(CONF_SUPPORT_AUDIO): + if entity_config.get(CONF_SUPPORT_AUDIO): cameras_with_audio.append(entity) data_schema = vol.Schema( @@ -453,18 +467,20 @@ class OptionsFlowHandler(config_entries.OptionsFlow): self, user_input: dict[str, Any] | None = None ) -> FlowResult: """Choose entity for the accessory.""" - domains = self.hk_options[CONF_DOMAINS] + hk_options = self.hk_options + domains = hk_options[CONF_DOMAINS] + entity_filter: EntityFilterDict if user_input is not None: entities = cv.ensure_list(user_input[CONF_ENTITIES]) - entity_filter = _async_build_entites_filter(domains, entities) + entity_filter = _async_build_entities_filter(domains, entities) self.included_cameras = _async_cameras_from_entities(entities) - self.hk_options[CONF_FILTER] = entity_filter + hk_options[CONF_FILTER] = entity_filter if self.included_cameras: return await self.async_step_cameras() return await self.async_step_advanced() - entity_filter = self.hk_options.get(CONF_FILTER, {}) + entity_filter = hk_options.get(CONF_FILTER, {}) entities = entity_filter.get(CONF_INCLUDE_ENTITIES, []) all_supported_entities = _async_get_matching_entities( self.hass, domains, include_entity_category=True, include_hidden=True @@ -494,24 +510,21 @@ class OptionsFlowHandler(config_entries.OptionsFlow): self, user_input: dict[str, Any] | None = None ) -> FlowResult: """Choose entities to include from the domain on the bridge.""" - domains = self.hk_options[CONF_DOMAINS] + hk_options = self.hk_options + domains = hk_options[CONF_DOMAINS] if user_input is not None: entities = cv.ensure_list(user_input[CONF_ENTITIES]) - entity_filter = _async_build_entites_filter(domains, entities) self.included_cameras = _async_cameras_from_entities(entities) - self.hk_options[CONF_FILTER] = entity_filter + hk_options[CONF_FILTER] = _async_build_entities_filter(domains, entities) if self.included_cameras: return await self.async_step_cameras() return await self.async_step_advanced() - entity_filter = self.hk_options.get(CONF_FILTER, {}) + entity_filter: EntityFilterDict = hk_options.get(CONF_FILTER, {}) entities = entity_filter.get(CONF_INCLUDE_ENTITIES, []) - all_supported_entities = _async_get_matching_entities( self.hass, domains, include_entity_category=True, include_hidden=True ) - if not entities: - entities = entity_filter.get(CONF_EXCLUDE_ENTITIES, []) # Strip out entities that no longer exist to prevent error in the UI default_value = [ entity_id for entity_id in entities if entity_id in all_supported_entities @@ -535,15 +548,13 @@ class OptionsFlowHandler(config_entries.OptionsFlow): self, user_input: dict[str, Any] | None = None ) -> FlowResult: """Choose entities to exclude from the domain on the bridge.""" - domains = self.hk_options[CONF_DOMAINS] + hk_options = self.hk_options + domains = hk_options[CONF_DOMAINS] if user_input is not None: - entity_filter = deepcopy(_EMPTY_ENTITY_FILTER) - entities = cv.ensure_list(user_input[CONF_ENTITIES]) - entity_filter[CONF_INCLUDE_DOMAINS] = domains - entity_filter[CONF_EXCLUDE_ENTITIES] = entities self.included_cameras = {} - if CAMERA_DOMAIN in entity_filter[CONF_INCLUDE_DOMAINS]: + entities = cv.ensure_list(user_input[CONF_ENTITIES]) + if CAMERA_DOMAIN in domains: camera_entities = _async_get_matching_entities( self.hass, [CAMERA_DOMAIN] ) @@ -552,7 +563,9 @@ class OptionsFlowHandler(config_entries.OptionsFlow): for entity_id in camera_entities if entity_id not in entities } - self.hk_options[CONF_FILTER] = entity_filter + hk_options[CONF_FILTER] = _make_entity_filter( + include_domains=domains, exclude_entities=entities + ) if self.included_cameras: return await self.async_step_cameras() return await self.async_step_advanced() @@ -600,14 +613,13 @@ class OptionsFlowHandler(config_entries.OptionsFlow): self.hk_options = deepcopy(dict(self.config_entry.options)) homekit_mode = self.hk_options.get(CONF_HOMEKIT_MODE, DEFAULT_HOMEKIT_MODE) - entity_filter = self.hk_options.get(CONF_FILTER, {}) + entity_filter: EntityFilterDict = self.hk_options.get(CONF_FILTER, {}) include_exclude_mode = MODE_INCLUDE entities = entity_filter.get(CONF_INCLUDE_ENTITIES, []) if homekit_mode != HOMEKIT_MODE_ACCESSORY: include_exclude_mode = MODE_INCLUDE if entities else MODE_EXCLUDE domains = entity_filter.get(CONF_INCLUDE_DOMAINS, []) - include_entities = entity_filter.get(CONF_INCLUDE_ENTITIES) - if include_entities: + if include_entities := entity_filter.get(CONF_INCLUDE_ENTITIES): domains.extend(_domains_set_from_entities(include_entities)) name_to_type_map = await _async_name_to_type_map(self.hass) return self.async_show_form( @@ -708,7 +720,7 @@ def _async_get_entity_ids_for_accessory_mode( def _async_entity_ids_with_accessory_mode(hass: HomeAssistant) -> set[str]: """Return a set of entity ids that have config entries in accessory mode.""" - entity_ids = set() + entity_ids: set[str] = set() current_entries = hass.config_entries.async_entries(DOMAIN) for entry in current_entries: diff --git a/tests/components/homekit/test_config_flow.py b/tests/components/homekit/test_config_flow.py index 838f72be3c6..6dff9ef896e 100644 --- a/tests/components/homekit/test_config_flow.py +++ b/tests/components/homekit/test_config_flow.py @@ -889,7 +889,7 @@ async def test_options_flow_include_mode_with_cameras( "filter": { "exclude_domains": [], "exclude_entities": [], - "include_domains": ["fan", "vacuum", "climate"], + "include_domains": ["climate", "fan", "vacuum"], "include_entities": ["camera.native_h264", "camera.transcode_h264"], }, "entity_config": {"camera.native_h264": {"video_codec": "copy"}}, @@ -904,15 +904,15 @@ async def test_options_flow_include_mode_with_cameras( assert result["type"] == data_entry_flow.FlowResultType.FORM assert result["step_id"] == "init" assert result["data_schema"]({}) == { - "domains": ["fan", "vacuum", "climate", "camera"], + "domains": ["climate", "fan", "vacuum", "camera"], "mode": "bridge", "include_exclude_mode": "include", } schema = result["data_schema"].schema assert _get_schema_default(schema, "domains") == [ + "climate", "fan", "vacuum", - "climate", "camera", ] assert _get_schema_default(schema, "mode") == "bridge" @@ -921,7 +921,7 @@ async def test_options_flow_include_mode_with_cameras( result = await hass.config_entries.options.async_configure( result["flow_id"], user_input={ - "domains": ["fan", "vacuum", "climate", "camera"], + "domains": ["climate", "fan", "vacuum", "camera"], "include_exclude_mode": "exclude", }, ) @@ -959,11 +959,11 @@ async def test_options_flow_include_mode_with_cameras( assert result3["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY assert config_entry.options == { - "entity_config": {"camera.native_h264": {}}, + "entity_config": {}, "filter": { "exclude_domains": [], "exclude_entities": ["climate.old", "camera.excluded"], - "include_domains": ["fan", "vacuum", "climate", "camera"], + "include_domains": ["climate", "fan", "vacuum", "camera"], "include_entities": [], }, "mode": "bridge", @@ -1025,7 +1025,7 @@ async def test_options_flow_with_camera_audio( "filter": { "exclude_domains": [], "exclude_entities": [], - "include_domains": ["fan", "vacuum", "climate"], + "include_domains": ["climate", "fan", "vacuum"], "include_entities": ["camera.audio", "camera.no_audio"], }, "entity_config": {"camera.audio": {"support_audio": True}}, @@ -1040,15 +1040,15 @@ async def test_options_flow_with_camera_audio( assert result["type"] == data_entry_flow.FlowResultType.FORM assert result["step_id"] == "init" assert result["data_schema"]({}) == { - "domains": ["fan", "vacuum", "climate", "camera"], + "domains": ["climate", "fan", "vacuum", "camera"], "mode": "bridge", "include_exclude_mode": "include", } schema = result["data_schema"].schema assert _get_schema_default(schema, "domains") == [ + "climate", "fan", "vacuum", - "climate", "camera", ] assert _get_schema_default(schema, "mode") == "bridge" @@ -1058,7 +1058,7 @@ async def test_options_flow_with_camera_audio( result["flow_id"], user_input={ "include_exclude_mode": "exclude", - "domains": ["fan", "vacuum", "climate", "camera"], + "domains": ["climate", "fan", "vacuum", "camera"], }, ) @@ -1095,11 +1095,11 @@ async def test_options_flow_with_camera_audio( assert result3["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY assert config_entry.options == { - "entity_config": {"camera.audio": {}}, + "entity_config": {}, "filter": { "exclude_domains": [], "exclude_entities": ["climate.old", "camera.excluded"], - "include_domains": ["fan", "vacuum", "climate", "camera"], + "include_domains": ["climate", "fan", "vacuum", "camera"], "include_entities": [], }, "mode": "bridge",