Purge entity and device registries when importing lcn from configuration.yaml (#54266)

* Identify LCN orphans in entity registry and device registry and remove them

* Fix typing issues

* Revert "Fix typing issues"

This reverts commit eccd067b3b5f23135e6c8a79d25f7f2cbc2d0ae9.

* Fix removal of devices which do not belong to given config_entry

* Use helper for getting entities for config_entry

* Rename variable
This commit is contained in:
Andre Lengwenus 2022-05-22 18:59:44 +02:00 committed by GitHub
parent 16088915eb
commit ead6e7da1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 81 additions and 11 deletions

View File

@ -2,7 +2,6 @@
from __future__ import annotations
import logging
from typing import Any
import pypck
@ -16,15 +15,16 @@ from homeassistant.const import (
)
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.typing import ConfigType
from .const import CONF_DIM_MODE, CONF_SK_NUM_TRIES, DOMAIN
from .helpers import purge_device_registry, purge_entity_registry
_LOGGER = logging.getLogger(__name__)
def get_config_entry(
hass: HomeAssistant, data: dict[str, Any]
hass: HomeAssistant, data: ConfigType
) -> config_entries.ConfigEntry | None:
"""Check config entries for already configured entries based on the ip address/port."""
return next(
@ -38,7 +38,7 @@ def get_config_entry(
)
async def validate_connection(host_name: str, data: dict[str, Any]) -> dict[str, Any]:
async def validate_connection(host_name: str, data: ConfigType) -> ConfigType:
"""Validate if a connection to LCN can be established."""
host = data[CONF_IP_ADDRESS]
port = data[CONF_PORT]
@ -70,7 +70,7 @@ class LcnFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
async def async_step_import(self, data: dict[str, Any]) -> FlowResult:
async def async_step_import(self, data: ConfigType) -> FlowResult:
"""Import existing configuration from LCN."""
host_name = data[CONF_HOST]
# validate the imported connection parameters
@ -93,13 +93,10 @@ class LcnFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
# check if we already have a host with the same address configured
if entry := get_config_entry(self.hass, data):
entry.source = config_entries.SOURCE_IMPORT
# Cleanup entity and device registry, if we imported from configuration.yaml to
# remove orphans when entities were removed from configuration
entity_registry = er.async_get(self.hass)
entity_registry.async_clear_config_entry(entry.entry_id)
device_registry = dr.async_get(self.hass)
device_registry.async_clear_config_entry(entry.entry_id)
purge_entity_registry(self.hass, entry.entry_id, data)
purge_device_registry(self.hass, entry.entry_id, data)
self.hass.config_entries.async_update_entry(entry, data=data)
return self.async_abort(reason="existing_configuration_updated")

View File

@ -30,7 +30,7 @@ from homeassistant.const import (
CONF_USERNAME,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.typing import ConfigType
from .const import (
@ -247,6 +247,75 @@ def import_lcn_config(lcn_config: ConfigType) -> list[ConfigType]:
return list(data.values())
def purge_entity_registry(
hass: HomeAssistant, entry_id: str, imported_entry_data: ConfigType
) -> None:
"""Remove orphans from entity registry which are not in entry data."""
entity_registry = er.async_get(hass)
# Find all entities that are referenced in the config entry.
references_config_entry = {
entity_entry.entity_id
for entity_entry in er.async_entries_for_config_entry(entity_registry, entry_id)
}
# Find all entities that are referenced by the entry_data.
references_entry_data = set()
for entity_data in imported_entry_data[CONF_ENTITIES]:
entity_unique_id = generate_unique_id(
entry_id, entity_data[CONF_ADDRESS], entity_data[CONF_RESOURCE]
)
entity_id = entity_registry.async_get_entity_id(
entity_data[CONF_DOMAIN], DOMAIN, entity_unique_id
)
if entity_id is not None:
references_entry_data.add(entity_id)
orphaned_ids = references_config_entry - references_entry_data
for orphaned_id in orphaned_ids:
entity_registry.async_remove(orphaned_id)
def purge_device_registry(
hass: HomeAssistant, entry_id: str, imported_entry_data: ConfigType
) -> None:
"""Remove orphans from device registry which are not in entry data."""
device_registry = dr.async_get(hass)
entity_registry = er.async_get(hass)
# Find all devices that are referenced in the entity registry.
references_entities = {
entry.device_id for entry in entity_registry.entities.values()
}
# Find device that references the host.
references_host = set()
host_device = device_registry.async_get_device({(DOMAIN, entry_id)})
if host_device is not None:
references_host.add(host_device.id)
# Find all devices that are referenced by the entry_data.
references_entry_data = set()
for device_data in imported_entry_data[CONF_DEVICES]:
device_unique_id = generate_unique_id(entry_id, device_data[CONF_ADDRESS])
device = device_registry.async_get_device({(DOMAIN, device_unique_id)})
if device is not None:
references_entry_data.add(device.id)
orphaned_ids = (
{
entry.id
for entry in dr.async_entries_for_config_entry(device_registry, entry_id)
}
- references_entities
- references_host
- references_entry_data
)
for device_id in orphaned_ids:
device_registry.async_remove_device(device_id)
def register_lcn_host_device(hass: HomeAssistant, config_entry: ConfigEntry) -> None:
"""Register LCN host for given config_entry in device registry."""
device_registry = dr.async_get(hass)

View File

@ -7,6 +7,8 @@ import pytest
from homeassistant import config_entries, data_entry_flow
from homeassistant.components.lcn.const import CONF_DIM_MODE, CONF_SK_NUM_TRIES, DOMAIN
from homeassistant.const import (
CONF_DEVICES,
CONF_ENTITIES,
CONF_HOST,
CONF_IP_ADDRESS,
CONF_PASSWORD,
@ -24,6 +26,8 @@ IMPORT_DATA = {
CONF_PASSWORD: "lcn",
CONF_SK_NUM_TRIES: 0,
CONF_DIM_MODE: "STEPS200",
CONF_DEVICES: [],
CONF_ENTITIES: [],
}