Add support for parent_device field so entities are nested within Keypad Devices (#79513)

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Kevin Addeman 2022-10-09 14:39:12 -04:00 committed by GitHub
parent 618f259fd8
commit b7e84543c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 78 additions and 53 deletions

View File

@ -177,15 +177,15 @@ async def async_setup_entry(
buttons = bridge.buttons buttons = bridge.buttons
_async_register_bridge_device(hass, entry_id, bridge_device) _async_register_bridge_device(hass, entry_id, bridge_device)
button_devices = _async_register_button_devices( button_devices, device_info_by_device_id = _async_register_button_devices(
hass, entry_id, bridge_device, buttons hass, entry_id, bridge, bridge_device, buttons
) )
_async_subscribe_pico_remote_events(hass, bridge, buttons) _async_subscribe_pico_remote_events(hass, bridge, buttons)
# Store this bridge (keyed by entry_id) so it can be retrieved by the # Store this bridge (keyed by entry_id) so it can be retrieved by the
# platforms we're setting up. # platforms we're setting up.
hass.data[DOMAIN][entry_id] = LutronCasetaData( hass.data[DOMAIN][entry_id] = LutronCasetaData(
bridge, bridge_device, button_devices bridge, bridge_device, button_devices, device_info_by_device_id
) )
await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS) await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)
@ -213,34 +213,46 @@ def _async_register_bridge_device(
def _async_register_button_devices( def _async_register_button_devices(
hass: HomeAssistant, hass: HomeAssistant,
config_entry_id: str, config_entry_id: str,
bridge,
bridge_device, bridge_device,
button_devices_by_id: dict[int, dict], button_devices_by_id: dict[int, dict],
) -> dict[str, dict]: ) -> tuple[dict[str, dict], dict[int, dict[str, Any]]]:
"""Register button devices (Pico Remotes) in the device registry.""" """Register button devices (Pico Remotes) in the device registry."""
device_registry = dr.async_get(hass) device_registry = dr.async_get(hass)
button_devices_by_dr_id: dict[str, dict] = {} button_devices_by_dr_id: dict[str, dict] = {}
seen = set() device_info_by_device_id: dict[int, dict[str, Any]] = {}
seen: set[str] = set()
bridge_devices = bridge.get_devices()
for device in button_devices_by_id.values(): for device in button_devices_by_id.values():
if "serial" not in device or device["serial"] in seen:
ha_device = device
if "parent_device" in device and device["parent_device"] is not None:
# Device is a child of parent_device
# use the parent_device for HA device info
ha_device = bridge_devices[device["parent_device"]]
if "serial" not in ha_device or ha_device["serial"] in seen:
continue continue
seen.add(device["serial"]) seen.add(ha_device["serial"])
area, name = _area_and_name_from_name(device["name"])
area, name = _area_and_name_from_name(ha_device["name"])
device_args: dict[str, Any] = { device_args: dict[str, Any] = {
"name": f"{area} {name}", "name": f"{area} {name}",
"manufacturer": MANUFACTURER, "manufacturer": MANUFACTURER,
"config_entry_id": config_entry_id, "config_entry_id": config_entry_id,
"identifiers": {(DOMAIN, device["serial"])}, "identifiers": {(DOMAIN, ha_device["serial"])},
"model": f"{device['model']} ({device['type']})", "model": f"{ha_device['model']} ({ha_device['type']})",
"via_device": (DOMAIN, bridge_device["serial"]), "via_device": (DOMAIN, bridge_device["serial"]),
} }
if area != UNASSIGNED_AREA: if area != UNASSIGNED_AREA:
device_args["suggested_area"] = area device_args["suggested_area"] = area
dr_device = device_registry.async_get_or_create(**device_args) dr_device = device_registry.async_get_or_create(**device_args)
button_devices_by_dr_id[dr_device.id] = device button_devices_by_dr_id[dr_device.id] = ha_device
device_info_by_device_id.setdefault(ha_device["device_id"], device_args)
return button_devices_by_dr_id return button_devices_by_dr_id, device_info_by_device_id
def _area_and_name_from_name(device_name: str) -> tuple[str, str]: def _area_and_name_from_name(device_name: str) -> tuple[str, str]:
@ -282,16 +294,23 @@ def _async_subscribe_pico_remote_events(
else: else:
action = ACTION_RELEASE action = ACTION_RELEASE
type_ = _lutron_model_to_device_type(device["model"], device["type"]) bridge_devices = bridge_device.get_devices()
area, name = _area_and_name_from_name(device["name"]) ha_device = device
if "parent_device" in device and device["parent_device"] is not None:
# Device is a child of parent_device
# use the parent_device for HA device info
ha_device = bridge_devices[device["parent_device"]]
type_ = _lutron_model_to_device_type(ha_device["model"], ha_device["type"])
area, name = _area_and_name_from_name(ha_device["name"])
leap_button_number = device["button_number"] leap_button_number = device["button_number"]
lip_button_number = async_get_lip_button(type_, leap_button_number) lip_button_number = async_get_lip_button(type_, leap_button_number)
hass_device = dev_reg.async_get_device({(DOMAIN, device["serial"])}) hass_device = dev_reg.async_get_device({(DOMAIN, ha_device["serial"])})
hass.bus.async_fire( hass.bus.async_fire(
LUTRON_CASETA_BUTTON_EVENT, LUTRON_CASETA_BUTTON_EVENT,
{ {
ATTR_SERIAL: device["serial"], ATTR_SERIAL: ha_device["serial"],
ATTR_TYPE: type_, ATTR_TYPE: type_,
ATTR_BUTTON_NUMBER: lip_button_number, ATTR_BUTTON_NUMBER: lip_button_number,
ATTR_LEAP_BUTTON_NUMBER: leap_button_number, ATTR_LEAP_BUTTON_NUMBER: leap_button_number,
@ -327,7 +346,7 @@ class LutronCasetaDevice(Entity):
_attr_should_poll = False _attr_should_poll = False
def __init__(self, device, bridge, bridge_device): def __init__(self, device, data):
"""Set up the base class. """Set up the base class.
[:param]device the device metadata [:param]device the device metadata
@ -335,11 +354,24 @@ class LutronCasetaDevice(Entity):
[:param]bridge_device a dict with the details of the bridge [:param]bridge_device a dict with the details of the bridge
""" """
self._device = device self._device = device
self._smartbridge = bridge self._smartbridge = data.bridge
self._bridge_device = bridge_device self._bridge_device = data.bridge_device
self._bridge_unique_id = serial_to_unique_id(bridge_device["serial"]) self._bridge_unique_id = serial_to_unique_id(data.bridge_device["serial"])
if "serial" not in self._device: if "serial" not in self._device:
return return
if "parent_device" in device and (
parent_device_info := data.device_info_by_device_id.get(
device["parent_device"]
)
):
# Append the child device name to the end of the parent keypad name to create the entity name
self._attr_name = f'{parent_device_info["name"]} {device["device_name"]}'
# Set the device_info to the same as the Parent Keypad
# The entities will be nested inside the keypad device
self._attr_device_info = parent_device_info
return
area, name = _area_and_name_from_name(device["name"]) area, name = _area_and_name_from_name(device["name"])
self._attr_name = full_name = f"{area} {name}" self._attr_name = full_name = f"{area} {name}"
info = DeviceInfo( info = DeviceInfo(

View File

@ -28,10 +28,9 @@ async def async_setup_entry(
""" """
data: LutronCasetaData = hass.data[CASETA_DOMAIN][config_entry.entry_id] data: LutronCasetaData = hass.data[CASETA_DOMAIN][config_entry.entry_id]
bridge = data.bridge bridge = data.bridge
bridge_device = data.bridge_device
occupancy_groups = bridge.occupancy_groups occupancy_groups = bridge.occupancy_groups
async_add_entities( async_add_entities(
LutronOccupancySensor(occupancy_group, bridge, bridge_device) LutronOccupancySensor(occupancy_group, data)
for occupancy_group in occupancy_groups.values() for occupancy_group in occupancy_groups.values()
) )
@ -41,9 +40,9 @@ class LutronOccupancySensor(LutronCasetaDevice, BinarySensorEntity):
_attr_device_class = BinarySensorDeviceClass.OCCUPANCY _attr_device_class = BinarySensorDeviceClass.OCCUPANCY
def __init__(self, device, bridge, bridge_device): def __init__(self, device, data):
"""Init an occupancy sensor.""" """Init an occupancy sensor."""
super().__init__(device, bridge, bridge_device) super().__init__(device, data)
_, name = _area_and_name_from_name(device["name"]) _, name = _area_and_name_from_name(device["name"])
self._attr_name = name self._attr_name = name
self._attr_device_info = DeviceInfo( self._attr_device_info = DeviceInfo(

View File

@ -30,11 +30,9 @@ async def async_setup_entry(
""" """
data: LutronCasetaData = hass.data[CASETA_DOMAIN][config_entry.entry_id] data: LutronCasetaData = hass.data[CASETA_DOMAIN][config_entry.entry_id]
bridge = data.bridge bridge = data.bridge
bridge_device = data.bridge_device
cover_devices = bridge.get_devices_by_domain(DOMAIN) cover_devices = bridge.get_devices_by_domain(DOMAIN)
async_add_entities( async_add_entities(
LutronCasetaCover(cover_device, bridge, bridge_device) LutronCasetaCover(cover_device, data) for cover_device in cover_devices
for cover_device in cover_devices
) )

View File

@ -34,11 +34,8 @@ async def async_setup_entry(
""" """
data: LutronCasetaData = hass.data[CASETA_DOMAIN][config_entry.entry_id] data: LutronCasetaData = hass.data[CASETA_DOMAIN][config_entry.entry_id]
bridge = data.bridge bridge = data.bridge
bridge_device = data.bridge_device
fan_devices = bridge.get_devices_by_domain(DOMAIN) fan_devices = bridge.get_devices_by_domain(DOMAIN)
async_add_entities( async_add_entities(LutronCasetaFan(fan_device, data) for fan_device in fan_devices)
LutronCasetaFan(fan_device, bridge, bridge_device) for fan_device in fan_devices
)
class LutronCasetaFan(LutronCasetaDeviceUpdatableEntity, FanEntity): class LutronCasetaFan(LutronCasetaDeviceUpdatableEntity, FanEntity):

View File

@ -41,11 +41,9 @@ async def async_setup_entry(
""" """
data: LutronCasetaData = hass.data[CASETA_DOMAIN][config_entry.entry_id] data: LutronCasetaData = hass.data[CASETA_DOMAIN][config_entry.entry_id]
bridge = data.bridge bridge = data.bridge
bridge_device = data.bridge_device
light_devices = bridge.get_devices_by_domain(DOMAIN) light_devices = bridge.get_devices_by_domain(DOMAIN)
async_add_entities( async_add_entities(
LutronCasetaLight(light_device, bridge, bridge_device) LutronCasetaLight(light_device, data) for light_device in light_devices
for light_device in light_devices
) )

View File

@ -14,3 +14,4 @@ class LutronCasetaData:
bridge: Smartbridge bridge: Smartbridge
bridge_device: dict[str, Any] bridge_device: dict[str, Any]
button_devices: dict[str, dict] button_devices: dict[str, dict]
device_info_by_device_id: dict[int, dict[str, Any]]

View File

@ -27,23 +27,20 @@ async def async_setup_entry(
""" """
data: LutronCasetaData = hass.data[CASETA_DOMAIN][config_entry.entry_id] data: LutronCasetaData = hass.data[CASETA_DOMAIN][config_entry.entry_id]
bridge = data.bridge bridge = data.bridge
bridge_device = data.bridge_device
scenes = bridge.get_scenes() scenes = bridge.get_scenes()
async_add_entities( async_add_entities(LutronCasetaScene(scenes[scene], data) for scene in scenes)
LutronCasetaScene(scenes[scene], bridge, bridge_device) for scene in scenes
)
class LutronCasetaScene(Scene): class LutronCasetaScene(Scene):
"""Representation of a Lutron Caseta scene.""" """Representation of a Lutron Caseta scene."""
def __init__(self, scene, bridge, bridge_device): def __init__(self, scene, data):
"""Initialize the Lutron Caseta scene.""" """Initialize the Lutron Caseta scene."""
self._scene_id = scene["scene_id"] self._scene_id = scene["scene_id"]
self._bridge: Smartbridge = bridge self._bridge: Smartbridge = data.bridge
bridge_unique_id = serial_to_unique_id(bridge_device["serial"]) bridge_unique_id = serial_to_unique_id(data.bridge_device["serial"])
self._attr_device_info = DeviceInfo( self._attr_device_info = DeviceInfo(
identifiers={(CASETA_DOMAIN, bridge_device["serial"])}, identifiers={(CASETA_DOMAIN, data.bridge_device["serial"])},
) )
self._attr_name = _area_and_name_from_name(scene["name"])[1] self._attr_name = _area_and_name_from_name(scene["name"])[1]
self._attr_unique_id = f"scene_{bridge_unique_id}_{self._scene_id}" self._attr_unique_id = f"scene_{bridge_unique_id}_{self._scene_id}"

View File

@ -24,11 +24,9 @@ async def async_setup_entry(
""" """
data: LutronCasetaData = hass.data[CASETA_DOMAIN][config_entry.entry_id] data: LutronCasetaData = hass.data[CASETA_DOMAIN][config_entry.entry_id]
bridge = data.bridge bridge = data.bridge
bridge_device = data.bridge_device
switch_devices = bridge.get_devices_by_domain(DOMAIN) switch_devices = bridge.get_devices_by_domain(DOMAIN)
async_add_entities( async_add_entities(
LutronCasetaLight(switch_device, bridge, bridge_device) LutronCasetaLight(switch_device, data) for switch_device in switch_devices
for switch_device in switch_devices
) )

View File

@ -34,6 +34,7 @@ from tests.common import (
MOCK_BUTTON_DEVICES = [ MOCK_BUTTON_DEVICES = [
{ {
"device_id": "710",
"Name": "Back Hall Pico", "Name": "Back Hall Pico",
"ID": 2, "ID": 2,
"Area": {"Name": "Back Hall"}, "Area": {"Name": "Back Hall"},
@ -50,6 +51,7 @@ MOCK_BUTTON_DEVICES = [
"serial": 43845548, "serial": 43845548,
}, },
{ {
"device_id": "742",
"Name": "Front Steps Sunnata Keypad", "Name": "Front Steps Sunnata Keypad",
"ID": 3, "ID": 3,
"Area": {"Name": "Front Steps"}, "Area": {"Name": "Front Steps"},
@ -87,19 +89,22 @@ async def _async_setup_lutron_with_picos(hass, device_reg):
config_entry = MockConfigEntry(domain=DOMAIN, data={}) config_entry = MockConfigEntry(domain=DOMAIN, data={})
config_entry.add_to_hass(hass) config_entry.add_to_hass(hass)
dr_button_devices = {} dr_button_devices = {}
device_info_by_device_id = {}
for device in MOCK_BUTTON_DEVICES: for device in MOCK_BUTTON_DEVICES:
dr_device = device_reg.async_get_or_create( device_args = {
name=device["leap_name"], "name": device["leap_name"],
manufacturer=MANUFACTURER, "manufacturer": MANUFACTURER,
config_entry_id=config_entry.entry_id, "config_entry_id": config_entry.entry_id,
identifiers={(DOMAIN, device["serial"])}, "identifiers": {(DOMAIN, device["serial"])},
model=f"{device['model']} ({device[CONF_TYPE]})", "model": f"{device['model']} ({device[CONF_TYPE]})",
) }
dr_device = device_reg.async_get_or_create(**device_args)
dr_button_devices[dr_device.id] = device dr_button_devices[dr_device.id] = device
device_info_by_device_id.setdefault(device["device_id"], device_args)
hass.data[DOMAIN][config_entry.entry_id] = LutronCasetaData( hass.data[DOMAIN][config_entry.entry_id] = LutronCasetaData(
MagicMock(), MagicMock(), dr_button_devices MagicMock(), MagicMock(), dr_button_devices, device_info_by_device_id
) )
return config_entry.entry_id return config_entry.entry_id