Fix lutron caseta triggers when device fails to setup before startup finishes (#82714)

Refactor the trigger attach so its not dependant on the intergration
being already setup

fixes #82495 & fixes #81999
This commit is contained in:
J. Nick Koston 2022-11-29 15:46:01 -10:00 committed by GitHub
parent 43c8adc5ec
commit a3ec9529ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 170 additions and 87 deletions

View File

@ -6,7 +6,7 @@ import contextlib
from itertools import chain
import logging
import ssl
from typing import Any
from typing import Any, cast
import async_timeout
from pylutron_caseta import BUTTON_STATUS_PRESSED
@ -28,6 +28,7 @@ from .const import (
ATTR_ACTION,
ATTR_AREA_NAME,
ATTR_BUTTON_NUMBER,
ATTR_BUTTON_TYPE,
ATTR_DEVICE_NAME,
ATTR_LEAP_BUTTON_NUMBER,
ATTR_SERIAL,
@ -50,7 +51,21 @@ from .device_trigger import (
LEAP_TO_DEVICE_TYPE_SUBTYPE_MAP,
LUTRON_BUTTON_TRIGGER_SCHEMA,
)
from .models import LutronButton, LutronCasetaData, LutronKeypad, LutronKeypadData
from .models import (
LUTRON_BUTTON_LEAP_BUTTON_NUMBER,
LUTRON_KEYPAD_AREA_NAME,
LUTRON_KEYPAD_BUTTONS,
LUTRON_KEYPAD_DEVICE_REGISTRY_DEVICE_ID,
LUTRON_KEYPAD_LUTRON_DEVICE_ID,
LUTRON_KEYPAD_MODEL,
LUTRON_KEYPAD_NAME,
LUTRON_KEYPAD_SERIAL,
LUTRON_KEYPAD_TYPE,
LutronButton,
LutronCasetaData,
LutronKeypad,
LutronKeypadData,
)
from .util import serial_to_unique_id
_LOGGER = logging.getLogger(__name__)
@ -225,57 +240,77 @@ def _async_setup_keypads(
hass: HomeAssistant,
config_entry_id: str,
bridge: Smartbridge,
bridge_device: dict[str, Any],
bridge_device: dict[str, str | int],
) -> LutronKeypadData:
"""Register keypad devices (Keypads and Pico Remotes) in the device registry."""
device_registry = dr.async_get(hass)
bridge_devices = bridge.get_devices()
bridge_buttons = bridge.buttons
bridge_devices: dict[str, dict[str, str | int]] = bridge.get_devices()
bridge_buttons: dict[str, dict[str, str | int]] = bridge.buttons
dr_device_id_to_keypad: dict[str, LutronKeypad] = {}
keypads: dict[int, LutronKeypad] = {}
keypad_buttons: dict[int, LutronButton] = {}
keypad_button_names_to_leap: dict[int, dict[str, int]] = {}
leap_to_keypad_button_names: dict[int, dict[int, str]] = {}
for bridge_button in bridge_buttons.values():
bridge_keypad = bridge_devices[bridge_button["parent_device"]]
keypad_device_id = bridge_keypad["device_id"]
button_device_id = bridge_button["device_id"]
parent_device = cast(str, bridge_button["parent_device"])
bridge_keypad = bridge_devices[parent_device]
keypad_lutron_device_id = cast(int, bridge_keypad["device_id"])
button_lutron_device_id = cast(int, bridge_button["device_id"])
leap_button_number = cast(int, bridge_button["button_number"])
button_led_device_id = None
if "button_led" in bridge_button:
button_led_device_id = cast(str, bridge_button["button_led"])
if not (keypad := keypads.get(keypad_device_id)):
if not (keypad := keypads.get(keypad_lutron_device_id)):
# First time seeing this keypad, build keypad data and store in keypads
keypad = keypads[keypad_device_id] = _async_build_lutron_keypad(
bridge, bridge_device, bridge_keypad, keypad_device_id
keypad = keypads[keypad_lutron_device_id] = _async_build_lutron_keypad(
bridge, bridge_device, bridge_keypad, keypad_lutron_device_id
)
# Register the keypad device
dr_device = device_registry.async_get_or_create(
**keypad["device_info"], config_entry_id=config_entry_id
)
keypad["dr_device_id"] = dr_device.id
keypad[LUTRON_KEYPAD_DEVICE_REGISTRY_DEVICE_ID] = dr_device.id
dr_device_id_to_keypad[dr_device.id] = keypad
button_name = _get_button_name(keypad, bridge_button)
keypad_lutron_device_id = keypad[LUTRON_KEYPAD_LUTRON_DEVICE_ID]
# Add button to parent keypad, and build keypad_buttons and keypad_button_names_to_leap
button = keypad_buttons[button_device_id] = LutronButton(
lutron_device_id=button_device_id,
leap_button_number=bridge_button["button_number"],
button_name=_get_button_name(keypad, bridge_button),
led_device_id=bridge_button.get("button_led"),
parent_keypad=keypad["lutron_device_id"],
keypad_buttons[button_lutron_device_id] = LutronButton(
lutron_device_id=button_lutron_device_id,
leap_button_number=leap_button_number,
button_name=button_name,
led_device_id=button_led_device_id,
parent_keypad=keypad_lutron_device_id,
)
keypad["buttons"].append(button["lutron_device_id"])
keypad[LUTRON_KEYPAD_BUTTONS].append(button_lutron_device_id)
keypad_button_names_to_leap.setdefault(keypad["lutron_device_id"], {}).update(
{button["button_name"]: int(button["leap_button_number"])}
button_name_to_leap = keypad_button_names_to_leap.setdefault(
keypad_lutron_device_id, {}
)
button_name_to_leap[button_name] = leap_button_number
leap_to_button_name = leap_to_keypad_button_names.setdefault(
keypad_lutron_device_id, {}
)
leap_to_button_name[leap_button_number] = button_name
keypad_trigger_schemas = _async_build_trigger_schemas(keypad_button_names_to_leap)
_async_subscribe_keypad_events(hass, bridge, keypads, keypad_buttons)
_async_subscribe_keypad_events(
hass=hass,
bridge=bridge,
keypads=keypads,
keypad_buttons=keypad_buttons,
leap_to_keypad_button_names=leap_to_keypad_button_names,
)
return LutronKeypadData(
dr_device_id_to_keypad,
@ -312,7 +347,6 @@ def _async_build_lutron_keypad(
keypad_device_id: int,
) -> LutronKeypad:
# First time seeing this keypad, build keypad data and store in keypads
area_name = _area_name_from_id(bridge.areas, bridge_keypad["area"])
keypad_name = bridge_keypad["name"].split("_")[-1]
keypad_serial = _handle_none_keypad_serial(bridge_keypad, bridge_device["serial"])
@ -350,7 +384,7 @@ def _get_button_name(keypad: LutronKeypad, bridge_button: dict[str, Any]) -> str
# This is a Caseta Button retrieve name from hardcoded trigger definitions.
return _get_button_name_from_triggers(keypad, button_number)
keypad_model = keypad["model"]
keypad_model = keypad[LUTRON_KEYPAD_MODEL]
if keypad_model_override := KEYPAD_LEAP_BUTTON_NAME_OVERRIDE.get(keypad_model):
if alt_button_name := keypad_model_override.get(button_number):
return alt_button_name
@ -412,8 +446,9 @@ def async_get_lip_button(device_type: str, leap_button: int) -> int | None:
def _async_subscribe_keypad_events(
hass: HomeAssistant,
bridge: Smartbridge,
keypads: dict[int, Any],
keypad_buttons: dict[int, Any],
keypads: dict[int, LutronKeypad],
keypad_buttons: dict[int, LutronButton],
leap_to_keypad_button_names: dict[int, dict[int, str]],
):
"""Subscribe to lutron events."""
@ -429,20 +464,25 @@ def _async_subscribe_keypad_events(
else:
action = ACTION_RELEASE
keypad_type = keypad["type"]
leap_button_number = button["leap_button_number"]
keypad_type = keypad[LUTRON_KEYPAD_TYPE]
keypad_device_id = keypad[LUTRON_KEYPAD_LUTRON_DEVICE_ID]
leap_button_number = button[LUTRON_BUTTON_LEAP_BUTTON_NUMBER]
lip_button_number = async_get_lip_button(keypad_type, leap_button_number)
button_type = LEAP_TO_DEVICE_TYPE_SUBTYPE_MAP.get(
keypad_type, leap_to_keypad_button_names[keypad_device_id]
)[leap_button_number]
hass.bus.async_fire(
LUTRON_CASETA_BUTTON_EVENT,
{
ATTR_SERIAL: keypad["serial"],
ATTR_SERIAL: keypad[LUTRON_KEYPAD_SERIAL],
ATTR_TYPE: keypad_type,
ATTR_BUTTON_NUMBER: lip_button_number,
ATTR_LEAP_BUTTON_NUMBER: leap_button_number,
ATTR_DEVICE_NAME: keypad["name"],
ATTR_DEVICE_ID: keypad["dr_device_id"],
ATTR_AREA_NAME: keypad["area_name"],
ATTR_DEVICE_NAME: keypad[LUTRON_KEYPAD_NAME],
ATTR_DEVICE_ID: keypad[LUTRON_KEYPAD_DEVICE_REGISTRY_DEVICE_ID],
ATTR_AREA_NAME: keypad[LUTRON_KEYPAD_AREA_NAME],
ATTR_BUTTON_TYPE: button_type,
ATTR_ACTION: action,
},
)

View File

@ -18,6 +18,7 @@ MANUFACTURER = "Lutron Electronics Co., Inc"
ATTR_SERIAL = "serial"
ATTR_TYPE = "type"
ATTR_BUTTON_TYPE = "button_type"
ATTR_LEAP_BUTTON_NUMBER = "leap_button_number"
ATTR_BUTTON_NUMBER = "button_number" # LIP button number
ATTR_DEVICE_NAME = "device_name"

View File

@ -6,9 +6,6 @@ import logging
import voluptuous as vol
from homeassistant.components.device_automation import DEVICE_TRIGGER_BASE_SCHEMA
from homeassistant.components.device_automation.exceptions import (
InvalidDeviceAutomationConfig,
)
from homeassistant.components.homeassistant.triggers import event as event_trigger
from homeassistant.const import (
CONF_DEVICE_ID,
@ -18,7 +15,6 @@ from homeassistant.const import (
CONF_TYPE,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
from homeassistant.helpers.typing import ConfigType
@ -26,8 +22,7 @@ from .const import (
ACTION_PRESS,
ACTION_RELEASE,
ATTR_ACTION,
ATTR_LEAP_BUTTON_NUMBER,
ATTR_SERIAL,
ATTR_BUTTON_TYPE,
CONF_SUBTYPE,
DOMAIN,
LUTRON_CASETA_BUTTON_EVENT,
@ -317,7 +312,7 @@ DEVICE_TYPE_SUBTYPE_MAP_TO_LEAP = {
"FourGroupRemote": FOUR_GROUP_REMOTE_BUTTON_TYPES_TO_LEAP,
}
LEAP_TO_DEVICE_TYPE_SUBTYPE_MAP = {
LEAP_TO_DEVICE_TYPE_SUBTYPE_MAP: dict[str, dict[int, str]] = {
k: _reverse_dict(v) for k, v in DEVICE_TYPE_SUBTYPE_MAP_TO_LEAP.items()
}
@ -421,53 +416,22 @@ async def async_attach_trigger(
trigger_info: TriggerInfo,
) -> CALLBACK_TYPE:
"""Attach a trigger."""
device_id = config[CONF_DEVICE_ID]
subtype = config[CONF_SUBTYPE]
if not (data := get_lutron_data_by_dr_id(hass, device_id)) or not (
keypad := data.keypad_data.dr_device_id_to_keypad[device_id]
):
raise HomeAssistantError(
f"Cannot attach trigger {config} because device with id {device_id} is missing or invalid"
)
keypad_trigger_schemas = data.keypad_data.trigger_schemas
keypad_button_names_to_leap = data.keypad_data.button_names_to_leap
device_type = keypad["type"]
serial = keypad["serial"]
lutron_device_id = keypad["lutron_device_id"]
# Retrieve trigger schema, preferring hard-coded triggers from device_trigger.py
schema = DEVICE_TYPE_SCHEMA_MAP.get(
device_type,
keypad_trigger_schemas[lutron_device_id],
)
# Retrieve list of valid buttons, preferring hard-coded triggers from device_trigger.py
valid_buttons = DEVICE_TYPE_SUBTYPE_MAP_TO_LEAP.get(
device_type,
keypad_button_names_to_leap[lutron_device_id],
)
if subtype not in valid_buttons:
raise InvalidDeviceAutomationConfig(
f"Cannot attach trigger {config} because subtype {subtype} is invalid"
)
config = schema(config)
event_config = {
event_trigger.CONF_PLATFORM: CONF_EVENT,
event_trigger.CONF_EVENT_TYPE: LUTRON_CASETA_BUTTON_EVENT,
event_trigger.CONF_EVENT_DATA: {
ATTR_SERIAL: serial,
ATTR_LEAP_BUTTON_NUMBER: valid_buttons[subtype],
ATTR_ACTION: config[CONF_TYPE],
},
}
event_config = event_trigger.TRIGGER_SCHEMA(event_config)
return await event_trigger.async_attach_trigger(
hass, event_config, action, trigger_info, platform_type="device"
hass,
event_trigger.TRIGGER_SCHEMA(
{
event_trigger.CONF_PLATFORM: CONF_EVENT,
event_trigger.CONF_EVENT_TYPE: LUTRON_CASETA_BUTTON_EVENT,
event_trigger.CONF_EVENT_DATA: {
CONF_DEVICE_ID: config[CONF_DEVICE_ID],
ATTR_ACTION: config[CONF_TYPE],
ATTR_BUTTON_TYPE: config[CONF_SUBTYPE],
},
}
),
action,
trigger_info,
platform_type="device",
)

View File

@ -2,7 +2,7 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, TypedDict
from typing import Any, Final, TypedDict
from pylutron_caseta.smartbridge import Smartbridge
import voluptuous as vol
@ -45,11 +45,30 @@ class LutronKeypad(TypedDict):
buttons: list[int]
LUTRON_KEYPAD_LUTRON_DEVICE_ID: Final = "lutron_device_id"
LUTRON_KEYPAD_DEVICE_REGISTRY_DEVICE_ID: Final = "dr_device_id"
LUTRON_KEYPAD_AREA_ID: Final = "area_id"
LUTRON_KEYPAD_AREA_NAME: Final = "area_name"
LUTRON_KEYPAD_NAME: Final = "name"
LUTRON_KEYPAD_SERIAL: Final = "serial"
LUTRON_KEYPAD_DEVICE_INFO: Final = "device_info"
LUTRON_KEYPAD_MODEL: Final = "model"
LUTRON_KEYPAD_TYPE: Final = "type"
LUTRON_KEYPAD_BUTTONS: Final = "buttons"
class LutronButton(TypedDict):
"""A lutron_caseta button."""
lutron_device_id: int
leap_button_number: int
button_name: str
led_device_id: int
led_device_id: str | None
parent_keypad: int
LUTRON_BUTTON_LUTRON_DEVICE_ID: Final = "lutron_device_id"
LUTRON_BUTTON_LEAP_BUTTON_NUMBER: Final = "leap_button_number"
LUTRON_BUTTON_BUTTON_NAME: Final = "button_name"
LUTRON_BUTTON_LED_DEVICE_ID: Final = "led_device_id"
LUTRON_BUTTON_PARENT_KEYPAD: Final = "parent_keypad"

View File

@ -13,6 +13,7 @@ from homeassistant.components.lutron_caseta import (
ATTR_TYPE,
)
from homeassistant.components.lutron_caseta.const import (
ATTR_BUTTON_TYPE,
ATTR_LEAP_BUTTON_NUMBER,
CONF_CA_CERTS,
CONF_CERTFILE,
@ -23,6 +24,7 @@ from homeassistant.components.lutron_caseta.const import (
from homeassistant.components.lutron_caseta.device_trigger import CONF_SUBTYPE
from homeassistant.components.lutron_caseta.models import LutronCasetaData
from homeassistant.const import (
ATTR_DEVICE_ID,
CONF_DEVICE_ID,
CONF_DOMAIN,
CONF_HOST,
@ -254,6 +256,8 @@ async def test_if_fires_on_button_event(hass, calls, device_reg):
ATTR_DEVICE_NAME: device["Name"],
ATTR_AREA_NAME: device.get("Area", {}).get("Name"),
ATTR_ACTION: "press",
ATTR_DEVICE_ID: device_id,
ATTR_BUTTON_TYPE: "on",
}
hass.bus.async_fire(LUTRON_CASETA_BUTTON_EVENT, message)
await hass.async_block_till_done()
@ -298,6 +302,8 @@ async def test_if_fires_on_button_event_without_lip(hass, calls, device_reg):
ATTR_DEVICE_NAME: device["Name"],
ATTR_AREA_NAME: device.get("Area", {}).get("Name"),
ATTR_ACTION: "press",
ATTR_DEVICE_ID: device_id,
ATTR_BUTTON_TYPE: "Kitchen Pendants",
}
hass.bus.async_fire(LUTRON_CASETA_BUTTON_EVENT, message)
await hass.async_block_till_done()
@ -420,3 +426,56 @@ async def test_validate_trigger_invalid_triggers(hass, device_reg):
]
},
)
async def test_if_fires_on_button_event_late_setup(hass, calls):
"""Test for press trigger firing with integration getting setup late."""
config_entry_id = await _async_setup_lutron_with_picos(hass)
await hass.config_entries.async_unload(config_entry_id)
await hass.async_block_till_done()
device = MOCK_BUTTON_DEVICES[0]
dr = device_registry.async_get(hass)
dr_device = dr.async_get_device(identifiers={(DOMAIN, device["serial"])})
device_id = dr_device.id
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {
CONF_PLATFORM: "device",
CONF_DOMAIN: DOMAIN,
CONF_DEVICE_ID: device_id,
CONF_TYPE: "press",
CONF_SUBTYPE: "on",
},
"action": {
"service": "test.automation",
"data_template": {"some": "test_trigger_button_press"},
},
},
]
},
)
await hass.config_entries.async_setup(config_entry_id)
await hass.async_block_till_done()
message = {
ATTR_SERIAL: device.get("serial"),
ATTR_TYPE: device.get("type"),
ATTR_LEAP_BUTTON_NUMBER: 0,
ATTR_DEVICE_NAME: device["Name"],
ATTR_AREA_NAME: device.get("Area", {}).get("Name"),
ATTR_ACTION: "press",
ATTR_DEVICE_ID: device_id,
ATTR_BUTTON_TYPE: "on",
}
hass.bus.async_fire(LUTRON_CASETA_BUTTON_EVENT, message)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].data["some"] == "test_trigger_button_press"