Add dynamic generation of device triggers from keypad buttons (#80797)

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Kevin Addeman 2022-10-23 15:57:04 -04:00 committed by GitHub
parent 0f50b2edd3
commit 071f335fdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 482 additions and 283 deletions

View File

@ -37,6 +37,7 @@ from .const import (
CONF_CA_CERTS,
CONF_CERTFILE,
CONF_KEYFILE,
CONF_SUBTYPE,
CONFIG_URL,
DOMAIN,
LUTRON_CASETA_BUTTON_EVENT,
@ -45,10 +46,11 @@ from .const import (
)
from .device_trigger import (
DEVICE_TYPE_SUBTYPE_MAP_TO_LIP,
KEYPAD_LEAP_BUTTON_NAME_OVERRIDE,
LEAP_TO_DEVICE_TYPE_SUBTYPE_MAP,
_lutron_model_to_device_type,
LUTRON_BUTTON_TRIGGER_SCHEMA,
)
from .models import LutronCasetaData
from .models import LutronButton, LutronCasetaData, LutronKeypad, LutronKeypadData
from .util import serial_to_unique_id
_LOGGER = logging.getLogger(__name__)
@ -169,24 +171,25 @@ async def async_setup_entry(
_LOGGER.debug("Connected to Lutron Caseta bridge via LEAP at %s", host)
await _async_migrate_unique_ids(hass, config_entry)
devices = bridge.get_devices()
bridge_device = devices[BRIDGE_DEVICE_ID]
bridge_devices = bridge.get_devices()
bridge_device = bridge_devices[BRIDGE_DEVICE_ID]
if not config_entry.unique_id:
hass.config_entries.async_update_entry(
config_entry, unique_id=serial_to_unique_id(bridge_device["serial"])
)
buttons = bridge.buttons
_async_register_bridge_device(hass, entry_id, bridge_device, bridge)
button_devices, device_info_by_device_id = _async_register_button_devices(
hass, entry_id, bridge, bridge_device, buttons
)
_async_subscribe_pico_remote_events(hass, bridge, buttons)
keypad_data = _async_setup_keypads(hass, entry_id, bridge, bridge_device)
# Store this bridge (keyed by entry_id) so it can be retrieved by the
# platforms we're setting up.
hass.data[DOMAIN][entry_id] = LutronCasetaData(
bridge, bridge_device, button_devices, device_info_by_device_id
bridge,
bridge_device,
keypad_data,
)
await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)
@ -218,57 +221,154 @@ def _async_register_bridge_device(
@callback
def _async_register_button_devices(
def _async_setup_keypads(
hass: HomeAssistant,
config_entry_id: str,
bridge: Smartbridge,
bridge_device: dict[str, Any],
button_devices_by_id: dict[int, dict],
) -> tuple[dict[str, dict], dict[int, DeviceInfo]]:
"""Register button devices (Pico Remotes) in the device registry."""
) -> LutronKeypadData:
"""Register keypad devices (Keypads and Pico Remotes) in the device registry."""
device_registry = dr.async_get(hass)
button_devices_by_dr_id: dict[str, dict] = {}
device_info_by_device_id: dict[int, DeviceInfo] = {}
seen: set[str] = set()
bridge_devices = bridge.get_devices()
bridge_buttons = bridge.buttons
for device in button_devices_by_id.values():
dr_device_id_to_keypad: dict[str, LutronKeypad] = {}
keypads: dict[int, LutronKeypad] = {}
keypad_buttons: dict[int, LutronButton] = {}
keypad_button_names_to_leap: dict[int, dict[str, int]] = {}
ha_device = device
if "parent_device" in device and device["parent_device"] is not None:
# Device is a child of parent_device
# use the parent_device for HA device info
ha_device = bridge_devices[device["parent_device"]]
for bridge_button in bridge_buttons.values():
ha_device_serial = _handle_none_keypad_serial(
ha_device, bridge_device["serial"]
bridge_keypad = bridge_devices[bridge_button["parent_device"]]
keypad_device_id = bridge_keypad["device_id"]
button_device_id = bridge_button["device_id"]
if not (keypad := keypads.get(keypad_device_id)):
# First time seeing this keypad, build keypad data and store in keypads
keypad = keypads[keypad_device_id] = _async_build_lutron_keypad(
bridge, bridge_device, bridge_keypad, keypad_device_id
)
# Register the keypad device
dr_device = device_registry.async_get_or_create(
**keypad["device_info"], config_entry_id=config_entry_id
)
keypad["dr_device_id"] = dr_device.id
dr_device_id_to_keypad[dr_device.id] = keypad
# Add button to parent keypad, and build keypad_buttons and keypad_button_names_to_leap
button = keypad_buttons[button_device_id] = LutronButton(
lutron_device_id=button_device_id,
leap_button_number=bridge_button["button_number"],
button_name=_get_button_name(keypad, bridge_button),
led_device_id=bridge_button.get("button_led"),
parent_keypad=keypad["lutron_device_id"],
)
if "serial" not in ha_device or ha_device_serial in seen:
continue
seen.add(ha_device_serial)
keypad["buttons"].append(button["lutron_device_id"])
area = _area_name_from_id(bridge.areas, ha_device["area"])
# name field is still a combination of area and name from pylytron-caseta
# extract the name portion only.
name = ha_device["name"].split("_")[-1]
device_args: DeviceInfo = {
"name": f"{area} {name}",
"manufacturer": MANUFACTURER,
"identifiers": {(DOMAIN, ha_device_serial)},
"model": f"{ha_device['model']} ({ha_device['type']})",
"via_device": (DOMAIN, bridge_device["serial"]),
}
if area != UNASSIGNED_AREA:
device_args["suggested_area"] = area
dr_device = device_registry.async_get_or_create(
**device_args, config_entry_id=config_entry_id
keypad_button_names_to_leap.setdefault(keypad["lutron_device_id"], {}).update(
{button["button_name"]: int(button["leap_button_number"])}
)
button_devices_by_dr_id[dr_device.id] = ha_device
device_info_by_device_id.setdefault(ha_device["device_id"], device_args)
return button_devices_by_dr_id, device_info_by_device_id
keypad_trigger_schemas = _async_build_trigger_schemas(keypad_button_names_to_leap)
_async_subscribe_keypad_events(hass, bridge, keypads, keypad_buttons)
return LutronKeypadData(
dr_device_id_to_keypad,
keypads,
keypad_buttons,
keypad_button_names_to_leap,
keypad_trigger_schemas,
)
@callback
def _async_build_trigger_schemas(
keypad_button_names_to_leap: dict[int, dict[str, int]]
) -> dict[int, vol.Schema]:
"""Build device trigger schemas."""
return {
keypad_id: LUTRON_BUTTON_TRIGGER_SCHEMA.extend(
{
vol.Required(CONF_SUBTYPE): vol.In(
keypad_button_names_to_leap[keypad_id]
),
}
)
for keypad_id in keypad_button_names_to_leap
}
@callback
def _async_build_lutron_keypad(
bridge: Smartbridge,
bridge_device: dict[str, Any],
bridge_keypad: dict[str, Any],
keypad_device_id: int,
) -> LutronKeypad:
# First time seeing this keypad, build keypad data and store in keypads
area_name = _area_name_from_id(bridge.areas, bridge_keypad["area"])
keypad_name = bridge_keypad["name"].split("_")[-1]
keypad_serial = _handle_none_keypad_serial(bridge_keypad, bridge_device["serial"])
device_info = DeviceInfo(
name=f"{area_name} {keypad_name}",
manufacturer=MANUFACTURER,
identifiers={(DOMAIN, keypad_serial)},
model=f"{bridge_keypad['model']} ({bridge_keypad['type']})",
via_device=(DOMAIN, bridge_device["serial"]),
)
if area_name != UNASSIGNED_AREA:
device_info["suggested_area"] = area_name
return LutronKeypad(
lutron_device_id=keypad_device_id,
dr_device_id="",
area_id=bridge_keypad["area"],
area_name=area_name,
name=keypad_name,
serial=keypad_serial,
device_info=device_info,
model=bridge_keypad["model"],
type=bridge_keypad["type"],
buttons=[],
)
def _get_button_name(keypad: LutronKeypad, bridge_button: dict[str, Any]) -> str:
"""Get the LEAP button name and check for override."""
button_number = bridge_button["button_number"]
button_name = bridge_button.get("device_name")
if button_name is None:
# This is a Caseta Button retrieve name from hardcoded trigger definitions.
return _get_button_name_from_triggers(keypad, button_number)
keypad_model = keypad["model"]
if keypad_model_override := KEYPAD_LEAP_BUTTON_NAME_OVERRIDE.get(keypad_model):
if alt_button_name := keypad_model_override.get(button_number):
return alt_button_name
return button_name
def _get_button_name_from_triggers(keypad: LutronKeypad, button_number: int) -> str:
"""Retrieve the caseta button name from device triggers."""
button_number_map = LEAP_TO_DEVICE_TYPE_SUBTYPE_MAP.get(keypad["type"], {})
return (
button_number_map.get(
button_number,
f"button {button_number}",
)
.replace("_", " ")
.title()
)
def _handle_none_keypad_serial(keypad_device: dict, bridge_serial: int) -> str:
@ -303,17 +403,19 @@ def async_get_lip_button(device_type: str, leap_button: int) -> int | None:
@callback
def _async_subscribe_pico_remote_events(
def _async_subscribe_keypad_events(
hass: HomeAssistant,
bridge_device: Smartbridge,
button_devices_by_id: dict[int, dict],
bridge: Smartbridge,
keypads: dict[int, Any],
keypad_buttons: dict[int, Any],
):
"""Subscribe to lutron events."""
dev_reg = dr.async_get(hass)
@callback
def _async_button_event(button_id, event_type):
if not (device := button_devices_by_id.get(button_id)):
if not (button := keypad_buttons.get(button_id)) or not (
keypad := keypads.get(button["parent_keypad"])
):
return
if event_type == BUTTON_STATUS_PRESSED:
@ -321,40 +423,26 @@ def _async_subscribe_pico_remote_events(
else:
action = ACTION_RELEASE
bridge_devices = bridge_device.get_devices()
ha_device = device
if "parent_device" in device and device["parent_device"] is not None:
# Device is a child of parent_device
# use the parent_device for HA device info
ha_device = bridge_devices[device["parent_device"]]
ha_device_serial = _handle_none_keypad_serial(
ha_device, bridge_devices[BRIDGE_DEVICE_ID]["serial"]
)
type_ = _lutron_model_to_device_type(ha_device["model"], ha_device["type"])
area = _area_name_from_id(bridge_device.areas, ha_device["area"])
name = ha_device["name"].split("_")[-1]
leap_button_number = device["button_number"]
lip_button_number = async_get_lip_button(type_, leap_button_number)
hass_device = dev_reg.async_get_device({(DOMAIN, ha_device_serial)})
keypad_type = keypad["type"]
leap_button_number = button["leap_button_number"]
lip_button_number = async_get_lip_button(keypad_type, leap_button_number)
hass.bus.async_fire(
LUTRON_CASETA_BUTTON_EVENT,
{
ATTR_SERIAL: ha_device_serial,
ATTR_TYPE: type_,
ATTR_SERIAL: keypad["serial"],
ATTR_TYPE: keypad_type,
ATTR_BUTTON_NUMBER: lip_button_number,
ATTR_LEAP_BUTTON_NUMBER: leap_button_number,
ATTR_DEVICE_NAME: name,
ATTR_DEVICE_ID: hass_device.id,
ATTR_AREA_NAME: area,
ATTR_DEVICE_NAME: keypad["name"],
ATTR_DEVICE_ID: keypad["dr_device_id"],
ATTR_AREA_NAME: keypad["area_name"],
ATTR_ACTION: action,
},
)
for button_id in button_devices_by_id:
bridge_device.add_button_subscriber(
for button_id in keypad_buttons:
bridge.add_button_subscriber(
str(button_id),
lambda event_type, button_id=button_id: _async_button_event(
button_id, event_type

View File

@ -11,10 +11,7 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import LutronCasetaDevice
from .const import DOMAIN as CASETA_DOMAIN
from .device_trigger import (
LEAP_TO_DEVICE_TYPE_SUBTYPE_MAP,
_lutron_model_to_device_type,
)
from .device_trigger import LEAP_TO_DEVICE_TYPE_SUBTYPE_MAP
from .models import LutronCasetaData
@ -28,12 +25,13 @@ async def async_setup_entry(
bridge = data.bridge
button_devices = bridge.get_buttons()
all_devices = data.bridge.get_devices()
device_info_by_device_id = data.device_info_by_device_id
keypads = data.keypad_data.keypads
entities: list[LutronCasetaButton] = []
for device in button_devices.values():
parent_device_info = device_info_by_device_id[device["parent_device"]]
parent_keypad = keypads[device["parent_device"]]
parent_device_info = parent_keypad["device_info"]
enabled_default = True
if not (device_name := device.get("device_name")):
@ -43,9 +41,7 @@ async def async_setup_entry(
enabled_default = False
keypad_device = all_devices[device["parent_device"]]
button_numbers = LEAP_TO_DEVICE_TYPE_SUBTYPE_MAP.get(
_lutron_model_to_device_type(
keypad_device["model"], keypad_device["type"]
),
keypad_device["type"],
{},
)
device_name = (

View File

@ -1,6 +1,8 @@
"""Provides device triggers for lutron caseta."""
from __future__ import annotations
import logging
import voluptuous as vol
from homeassistant.components.device_automation import DEVICE_TRIGGER_BASE_SCHEMA
@ -17,7 +19,6 @@ from homeassistant.const import (
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
from homeassistant.helpers.typing import ConfigType
@ -33,19 +34,14 @@ from .const import (
)
from .models import LutronCasetaData
_LOGGER = logging.getLogger(__name__)
def _reverse_dict(forward_dict: dict) -> dict:
"""Reverse a dictionary."""
return {v: k for k, v in forward_dict.items()}
LUTRON_MODEL_TO_TYPE = {
"RRST-W2B-XX": "SunnataKeypad_2Button",
"RRST-W3RL-XX": "SunnataKeypad_3ButtonRaiseLower",
"RRST-W4B-XX": "SunnataKeypad_4Button",
}
SUPPORTED_INPUTS_EVENTS_TYPES = [ACTION_PRESS, ACTION_RELEASE]
LUTRON_BUTTON_TRIGGER_SCHEMA = DEVICE_TRIGGER_BASE_SCHEMA.extend(
@ -55,6 +51,20 @@ LUTRON_BUTTON_TRIGGER_SCHEMA = DEVICE_TRIGGER_BASE_SCHEMA.extend(
)
KEYPAD_LEAP_BUTTON_NAME_OVERRIDE = {
"RRD-W2RLD": {
17: "raise_1",
16: "lower_1",
19: "raise_2",
18: "lower_2",
},
"RRD-W1RLD": {
19: "raise",
18: "lower",
},
}
PICO_2_BUTTON_BUTTON_TYPES_TO_LIP = {
"on": 2,
"off": 4,
@ -271,72 +281,6 @@ FOUR_GROUP_REMOTE_TRIGGER_SCHEMA = LUTRON_BUTTON_TRIGGER_SCHEMA.extend(
)
SUNNATA_KEYPAD_2_BUTTON_BUTTON_TYPES_TO_LEAP = {
"button_1": 1,
"button_2": 2,
}
SUNNATA_KEYPAD_2_BUTTON_TRIGGER_SCHEMA = LUTRON_BUTTON_TRIGGER_SCHEMA.extend(
{
vol.Required(CONF_SUBTYPE): vol.In(
SUNNATA_KEYPAD_2_BUTTON_BUTTON_TYPES_TO_LEAP
),
}
)
SUNNATA_KEYPAD_3_BUTTON_RAISE_LOWER_BUTTON_TYPES_TO_LEAP = {
"button_1": 1,
"button_2": 2,
"button_3": 3,
"raise": 19,
"lower": 18,
}
SUNNATA_KEYPAD_3_BUTTON_RAISE_LOWER_TRIGGER_SCHEMA = (
LUTRON_BUTTON_TRIGGER_SCHEMA.extend(
{
vol.Required(CONF_SUBTYPE): vol.In(
SUNNATA_KEYPAD_3_BUTTON_RAISE_LOWER_BUTTON_TYPES_TO_LEAP
),
}
)
)
SUNNATA_KEYPAD_4_BUTTON_BUTTON_TYPES_TO_LEAP = {
"button_1": 1,
"button_2": 2,
"button_3": 3,
"button_4": 4,
}
SUNNATA_KEYPAD_4_BUTTON_TRIGGER_SCHEMA = LUTRON_BUTTON_TRIGGER_SCHEMA.extend(
{
vol.Required(CONF_SUBTYPE): vol.In(
SUNNATA_KEYPAD_4_BUTTON_BUTTON_TYPES_TO_LEAP
),
}
)
HOMEOWNER_KEYPAD_BUTTON_TYPES_TO_LEAP = {
"button_1": 1,
"button_2": 2,
"button_3": 3,
"button_4": 4,
"button_5": 5,
"button_6": 6,
"button_7": 7,
}
HOMEOWNER_KEYPAD_BUTTON_TRIGGER_SCHEMA = LUTRON_BUTTON_TRIGGER_SCHEMA.extend(
{
vol.Required(CONF_SUBTYPE): vol.In(HOMEOWNER_KEYPAD_BUTTON_TYPES_TO_LEAP),
}
)
PHANTOM_KEYPAD_BUTTON_TYPES_TO_LEAP: dict[str, int] = {}
PHANTOM_KEYPAD_BUTTON_TRIGGER_SCHEMA = LUTRON_BUTTON_TRIGGER_SCHEMA.extend(
{
vol.Required(CONF_SUBTYPE): vol.In(PHANTOM_KEYPAD_BUTTON_TYPES_TO_LEAP),
}
)
DEVICE_TYPE_SCHEMA_MAP = {
"Pico2Button": PICO_2_BUTTON_TRIGGER_SCHEMA,
"Pico2ButtonRaiseLower": PICO_2_BUTTON_RAISE_LOWER_TRIGGER_SCHEMA,
@ -347,11 +291,6 @@ DEVICE_TYPE_SCHEMA_MAP = {
"Pico4ButtonZone": PICO_4_BUTTON_ZONE_TRIGGER_SCHEMA,
"Pico4Button2Group": PICO_4_BUTTON_2_GROUP_TRIGGER_SCHEMA,
"FourGroupRemote": FOUR_GROUP_REMOTE_TRIGGER_SCHEMA,
"SunnataKeypad_2Button": SUNNATA_KEYPAD_2_BUTTON_TRIGGER_SCHEMA,
"SunnataKeypad_3ButtonRaiseLower": SUNNATA_KEYPAD_3_BUTTON_RAISE_LOWER_TRIGGER_SCHEMA,
"SunnataKeypad_4Button": SUNNATA_KEYPAD_4_BUTTON_TRIGGER_SCHEMA,
"HomeownerKeypad": HOMEOWNER_KEYPAD_BUTTON_TRIGGER_SCHEMA,
"PhantomKeypad": PHANTOM_KEYPAD_BUTTON_TRIGGER_SCHEMA,
}
DEVICE_TYPE_SUBTYPE_MAP_TO_LIP = {
@ -376,11 +315,6 @@ DEVICE_TYPE_SUBTYPE_MAP_TO_LEAP = {
"Pico4ButtonZone": PICO_4_BUTTON_ZONE_BUTTON_TYPES_TO_LEAP,
"Pico4Button2Group": PICO_4_BUTTON_2_GROUP_BUTTON_TYPES_TO_LEAP,
"FourGroupRemote": FOUR_GROUP_REMOTE_BUTTON_TYPES_TO_LEAP,
"SunnataKeypad_2Button": SUNNATA_KEYPAD_2_BUTTON_BUTTON_TYPES_TO_LEAP,
"SunnataKeypad_3ButtonRaiseLower": SUNNATA_KEYPAD_3_BUTTON_RAISE_LOWER_BUTTON_TYPES_TO_LEAP,
"SunnataKeypad_4Button": SUNNATA_KEYPAD_4_BUTTON_BUTTON_TYPES_TO_LEAP,
"HomeownerKeypad": HOMEOWNER_KEYPAD_BUTTON_TYPES_TO_LEAP,
"PhantomKeypad": PHANTOM_KEYPAD_BUTTON_TYPES_TO_LEAP,
}
LEAP_TO_DEVICE_TYPE_SUBTYPE_MAP = {
@ -395,32 +329,52 @@ TRIGGER_SCHEMA = vol.Any(
PICO_4_BUTTON_ZONE_TRIGGER_SCHEMA,
PICO_4_BUTTON_2_GROUP_TRIGGER_SCHEMA,
FOUR_GROUP_REMOTE_TRIGGER_SCHEMA,
SUNNATA_KEYPAD_2_BUTTON_TRIGGER_SCHEMA,
SUNNATA_KEYPAD_3_BUTTON_RAISE_LOWER_TRIGGER_SCHEMA,
SUNNATA_KEYPAD_4_BUTTON_TRIGGER_SCHEMA,
HOMEOWNER_KEYPAD_BUTTON_TRIGGER_SCHEMA,
PHANTOM_KEYPAD_BUTTON_TRIGGER_SCHEMA,
)
async def async_validate_trigger_config(
hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
# if device is available verify parameters against device capabilities
device = get_button_device_by_dr_id(hass, config[CONF_DEVICE_ID])
"""Validate trigger config."""
if not device:
device_id = config[CONF_DEVICE_ID]
subtype = config[CONF_SUBTYPE]
if not (data := get_lutron_data_by_dr_id(hass, device_id)) or not (
keypad := data.keypad_data.dr_device_id_to_keypad.get(device_id)
):
return config
keypad_trigger_schemas = data.keypad_data.trigger_schemas
keypad_button_names_to_leap = data.keypad_data.button_names_to_leap
# Retrieve trigger schema, preferring hard-coded triggers from device_trigger.py
if not (
schema := DEVICE_TYPE_SCHEMA_MAP.get(
_lutron_model_to_device_type(device["model"], device["type"])
keypad["type"],
keypad_trigger_schemas.get(keypad["lutron_device_id"]),
)
):
raise InvalidDeviceAutomationConfig(
f"Device model {device['model']} with type {device['type']} not supported: {config[CONF_DEVICE_ID]}"
# Trigger schema not found - log error
_LOGGER.error(
"Cannot validate trigger %s because the trigger schema was not found",
config,
)
return config
# Retrieve list of valid buttons, preferring hard-coded triggers from device_trigger.py
device_type = keypad["type"]
valid_buttons = DEVICE_TYPE_SUBTYPE_MAP_TO_LEAP.get(
device_type,
keypad_button_names_to_leap[keypad["lutron_device_id"]],
)
if subtype not in valid_buttons:
# Trigger subtype is invalid - raise error
_LOGGER.error(
"Cannot validate trigger %s because subtype %s is invalid", config, subtype
)
return config
return schema(config)
@ -431,12 +385,18 @@ async def async_get_triggers(
"""List device triggers for lutron caseta devices."""
triggers = []
if not (device := get_button_device_by_dr_id(hass, device_id)):
# Check if device is a valid button device. Return empty if not.
# Check if device is a valid keypad. Return empty if not.
if not (data := get_lutron_data_by_dr_id(hass, device_id)) or not (
keypad := data.keypad_data.dr_device_id_to_keypad.get(device_id)
):
return []
keypad_button_names_to_leap = data.keypad_data.button_names_to_leap
# Retrieve list of valid buttons, preferring hard-coded triggers from device_trigger.py
valid_buttons = DEVICE_TYPE_SUBTYPE_MAP_TO_LEAP.get(
_lutron_model_to_device_type(device["model"], device["type"]), {}
keypad["type"],
keypad_button_names_to_leap[keypad["lutron_device_id"]],
)
for trigger in SUPPORTED_INPUTS_EVENTS_TYPES:
@ -454,18 +414,6 @@ async def async_get_triggers(
return triggers
def _device_model_to_type(device_registry_model: str) -> str:
"""Convert a lutron_caseta device registry entry model to type."""
model_list = device_registry_model.split(" ")
device_type = model_list.pop().replace("(", "").replace(")", "")
return _lutron_model_to_device_type(" ".join(model_list), device_type)
def _lutron_model_to_device_type(model: str, device_type: str) -> str:
"""Get the mapped type based on the lutron model or type."""
return LUTRON_MODEL_TO_TYPE.get(model, device_type)
async def async_attach_trigger(
hass: HomeAssistant,
config: ConfigType,
@ -473,42 +421,63 @@ async def async_attach_trigger(
trigger_info: TriggerInfo,
) -> CALLBACK_TYPE:
"""Attach a trigger."""
device_registry = dr.async_get(hass)
if (
not (device := device_registry.async_get(config[CONF_DEVICE_ID]))
or not device.model
device_id = config[CONF_DEVICE_ID]
subtype = config[CONF_SUBTYPE]
if not (data := get_lutron_data_by_dr_id(hass, device_id)) or not (
keypad := data.keypad_data.dr_device_id_to_keypad[device_id]
):
raise HomeAssistantError(
f"Cannot attach trigger {config} because device with id {config[CONF_DEVICE_ID]} is missing or invalid"
f"Cannot attach trigger {config} because device with id {device_id} is missing or invalid"
)
device_type = _device_model_to_type(device.model)
_, serial = list(device.identifiers)[0]
schema = DEVICE_TYPE_SCHEMA_MAP[device_type]
valid_buttons = DEVICE_TYPE_SUBTYPE_MAP_TO_LEAP[device_type]
keypad_trigger_schemas = data.keypad_data.trigger_schemas
keypad_button_names_to_leap = data.keypad_data.button_names_to_leap
device_type = keypad["type"]
serial = keypad["serial"]
lutron_device_id = keypad["lutron_device_id"]
# Retrieve trigger schema, preferring hard-coded triggers from device_trigger.py
schema = DEVICE_TYPE_SCHEMA_MAP.get(
device_type,
keypad_trigger_schemas[lutron_device_id],
)
# Retrieve list of valid buttons, preferring hard-coded triggers from device_trigger.py
valid_buttons = DEVICE_TYPE_SUBTYPE_MAP_TO_LEAP.get(
device_type,
keypad_button_names_to_leap[lutron_device_id],
)
if subtype not in valid_buttons:
raise InvalidDeviceAutomationConfig(
f"Cannot attach trigger {config} because subtype {subtype} is invalid"
)
config = schema(config)
event_config = {
event_trigger.CONF_PLATFORM: CONF_EVENT,
event_trigger.CONF_EVENT_TYPE: LUTRON_CASETA_BUTTON_EVENT,
event_trigger.CONF_EVENT_DATA: {
ATTR_SERIAL: serial,
ATTR_LEAP_BUTTON_NUMBER: valid_buttons[config[CONF_SUBTYPE]],
ATTR_LEAP_BUTTON_NUMBER: valid_buttons[subtype],
ATTR_ACTION: config[CONF_TYPE],
},
}
event_config = event_trigger.TRIGGER_SCHEMA(event_config)
return await event_trigger.async_attach_trigger(
hass, event_config, action, trigger_info, platform_type="device"
)
def get_button_device_by_dr_id(hass: HomeAssistant, device_id: str):
"""Get a lutron device for the given device id."""
def get_lutron_data_by_dr_id(hass: HomeAssistant, device_id: str):
"""Get a lutron integration data for the given device registry device id."""
if DOMAIN not in hass.data:
return None
for entry_id in hass.data[DOMAIN]:
data: LutronCasetaData = hass.data[DOMAIN][entry_id]
if device := data.button_devices.get(device_id):
return device
if data.keypad_data.dr_device_id_to_keypad.get(device_id):
return data
return None

View File

@ -21,11 +21,16 @@ async def async_get_config_entry_diagnostics(
"title": entry.title,
"data": dict(entry.data),
},
"data": {
"bridge_data": {
"devices": bridge.devices,
"buttons": bridge.buttons,
"scenes": bridge.scenes,
"occupancy_groups": bridge.occupancy_groups,
"areas": bridge.areas,
},
"integration_data": {
"keypad_button_names_to_leap": data.keypad_data.button_names_to_leap,
"keypad_buttons": data.keypad_data.buttons,
"keypads": data.keypad_data.keypads,
},
}

View File

@ -4,6 +4,7 @@ from __future__ import annotations
from collections.abc import Callable
from homeassistant.components.logbook import LOGBOOK_ENTRY_MESSAGE, LOGBOOK_ENTRY_NAME
from homeassistant.const import ATTR_DEVICE_ID
from homeassistant.core import Event, HomeAssistant, callback
from .const import (
@ -15,7 +16,11 @@ from .const import (
DOMAIN,
LUTRON_CASETA_BUTTON_EVENT,
)
from .device_trigger import LEAP_TO_DEVICE_TYPE_SUBTYPE_MAP
from .device_trigger import (
LEAP_TO_DEVICE_TYPE_SUBTYPE_MAP,
_reverse_dict,
get_lutron_data_by_dr_id,
)
@callback
@ -28,11 +33,28 @@ def async_describe_events(
@callback
def async_describe_button_event(event: Event) -> dict[str, str]:
"""Describe lutron_caseta_button_event logbook event."""
data = event.data
device_type = data[ATTR_TYPE]
leap_button_number = data[ATTR_LEAP_BUTTON_NUMBER]
button_map = LEAP_TO_DEVICE_TYPE_SUBTYPE_MAP[device_type]
button_description = button_map[leap_button_number]
dr_device_id = data[ATTR_DEVICE_ID]
lutron_data = get_lutron_data_by_dr_id(hass, dr_device_id)
keypad = lutron_data.keypad_data.dr_device_id_to_keypad.get(dr_device_id)
keypad_id = keypad["lutron_device_id"]
keypad_button_names_to_leap = lutron_data.keypad_data.button_names_to_leap
if not (rev_button_map := LEAP_TO_DEVICE_TYPE_SUBTYPE_MAP.get(device_type)):
if fwd_button_map := keypad_button_names_to_leap.get(keypad_id):
rev_button_map = _reverse_dict(fwd_button_map)
if rev_button_map is None:
return {
LOGBOOK_ENTRY_NAME: f"{data[ATTR_AREA_NAME]} {data[ATTR_DEVICE_NAME]}",
LOGBOOK_ENTRY_MESSAGE: f"{data[ATTR_ACTION]} Error retrieving button description",
}
button_description = rev_button_map.get(leap_button_number)
return {
LOGBOOK_ENTRY_NAME: f"{data[ATTR_AREA_NAME]} {data[ATTR_DEVICE_NAME]}",
LOGBOOK_ENTRY_MESSAGE: f"{data[ATTR_ACTION]} {button_description}",

View File

@ -2,9 +2,10 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from typing import Any, TypedDict
from pylutron_caseta.smartbridge import Smartbridge
import voluptuous as vol
from homeassistant.helpers.entity import DeviceInfo
@ -15,5 +16,40 @@ class LutronCasetaData:
bridge: Smartbridge
bridge_device: dict[str, Any]
button_devices: dict[str, dict]
device_info_by_device_id: dict[int, DeviceInfo]
keypad_data: LutronKeypadData
@dataclass
class LutronKeypadData:
"""Data for the lutron_caseta integration keypads."""
dr_device_id_to_keypad: dict[str, LutronKeypad]
keypads: dict[int, LutronKeypad]
buttons: dict[int, LutronButton]
button_names_to_leap: dict[int, dict[str, int]]
trigger_schemas: dict[int, vol.Schema]
class LutronKeypad(TypedDict):
"""A lutron_caseta keypad device."""
lutron_device_id: int
dr_device_id: str
area_id: int
area_name: str
name: str
serial: str
device_info: DeviceInfo
model: str
type: str
buttons: list[int]
class LutronButton(TypedDict):
"""A lutron_caseta button."""
lutron_device_id: int
leap_button_number: int
button_name: str
led_device_id: int
parent_keypad: int

View File

@ -42,7 +42,9 @@ class LutronCasetaLight(LutronCasetaDeviceUpdatableEntity, SwitchEntity):
if "parent_device" not in device:
return
parent_device_info = data.device_info_by_device_id.get(device["parent_device"])
keypads = data.keypad_data.keypads
parent_keypad = keypads[device["parent_device"]]
parent_device_info = parent_keypad["device_info"]
# Append the child device name to the end of the parent keypad name to create the entity name
self._attr_name = f'{parent_device_info["name"]} {device["device_name"]}'
# Set the device_info to the same as the Parent Keypad

View File

@ -250,7 +250,7 @@ class MockBridge:
"111": {
"device_id": "111",
"current_state": "Release",
"button_number": 0,
"button_number": 1,
"name": "Dining Room_Pico",
"type": "Pico3ButtonRaiseLower",
"model": "PJ2-3BRL-GXX-X01",

View File

@ -15,7 +15,7 @@ async def test_button_unique_id(hass: HomeAssistant) -> None:
ra3_button_entity_id = (
"button.hallway_main_stairs_position_1_keypad_kitchen_pendants"
)
caseta_button_entity_id = "button.dining_room_pico_on"
caseta_button_entity_id = "button.dining_room_pico_stop"
entity_registry = er.async_get(hass)

View File

@ -1,5 +1,5 @@
"""The tests for Lutron Caséta device triggers."""
from unittest.mock import MagicMock
from unittest.mock import patch
import pytest
@ -14,16 +14,26 @@ from homeassistant.components.lutron_caseta import (
)
from homeassistant.components.lutron_caseta.const import (
ATTR_LEAP_BUTTON_NUMBER,
CONF_CA_CERTS,
CONF_CERTFILE,
CONF_KEYFILE,
DOMAIN,
LUTRON_CASETA_BUTTON_EVENT,
MANUFACTURER,
)
from homeassistant.components.lutron_caseta.device_trigger import CONF_SUBTYPE
from homeassistant.components.lutron_caseta.models import LutronCasetaData
from homeassistant.const import CONF_DEVICE_ID, CONF_DOMAIN, CONF_PLATFORM, CONF_TYPE
from homeassistant.const import (
CONF_DEVICE_ID,
CONF_DOMAIN,
CONF_HOST,
CONF_PLATFORM,
CONF_TYPE,
)
from homeassistant.helpers import device_registry
from homeassistant.setup import async_setup_component
from . import MockBridge
from tests.common import (
MockConfigEntry,
assert_lists_same,
@ -34,8 +44,8 @@ from tests.common import (
MOCK_BUTTON_DEVICES = [
{
"device_id": "710",
"Name": "Back Hall Pico",
"device_id": "9",
"Name": "Dining Room_Pico",
"ID": 2,
"Area": {"Name": "Back Hall"},
"Buttons": [
@ -45,14 +55,14 @@ MOCK_BUTTON_DEVICES = [
{"Number": 5},
{"Number": 6},
],
"leap_name": "Back Hall_Back Hall Pico",
"leap_name": "Dining Room_Pico",
"type": "Pico3ButtonRaiseLower",
"model": "PJ2-3BRL-GXX-X01",
"serial": 43845548,
"serial": 68551522,
},
{
"device_id": "742",
"Name": "Front Steps Sunnata Keypad",
"device_id": "1355",
"Name": "Main Stairs Position 1 Keypad",
"ID": 3,
"Area": {"Name": "Front Steps"},
"Buttons": [
@ -65,12 +75,12 @@ MOCK_BUTTON_DEVICES = [
"leap_name": "Front Steps_Front Steps Sunnata Keypad",
"type": "SunnataKeypad",
"model": "RRST-W4B-XX",
"serial": 43845547,
"serial": 66286451,
},
{
"device_id": "786",
"Name": "Example Homeowner Keypad",
"ID": 3,
"ID": 4,
"Area": {"Name": "Front Steps"},
"Buttons": [
{"Number": 12},
@ -84,7 +94,7 @@ MOCK_BUTTON_DEVICES = [
"leap_name": "Front Steps_Example Homeowner Keypad",
"type": "HomeownerKeypad",
"model": "Homeowner Keypad",
"serial": None,
"serial": "1234_786",
},
]
@ -101,39 +111,36 @@ def device_reg(hass):
return mock_device_registry(hass)
async def _async_setup_lutron_with_picos(hass, device_reg):
async def _async_setup_lutron_with_picos(hass):
"""Setups a lutron bridge with picos."""
await async_setup_component(hass, DOMAIN, {})
config_entry = MockConfigEntry(domain=DOMAIN, data={})
config_entry.add_to_hass(hass)
dr_button_devices = {}
device_info_by_device_id = {}
for device in MOCK_BUTTON_DEVICES:
device_args = {
"name": device["leap_name"],
"manufacturer": MANUFACTURER,
"config_entry_id": config_entry.entry_id,
"identifiers": {(DOMAIN, device["serial"])},
"model": f"{device['model']} ({device[CONF_TYPE]})",
}
dr_device = device_reg.async_get_or_create(**device_args)
dr_button_devices[dr_device.id] = device
device_info_by_device_id.setdefault(device["device_id"], device_args)
hass.data[DOMAIN][config_entry.entry_id] = LutronCasetaData(
MagicMock(), MagicMock(), dr_button_devices, device_info_by_device_id
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: "1.1.1.1",
CONF_KEYFILE: "",
CONF_CERTFILE: "",
CONF_CA_CERTS: "",
},
unique_id="abc",
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.lutron_caseta.Smartbridge.create_tls",
return_value=MockBridge(can_connect=True),
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return config_entry.entry_id
async def test_get_triggers(hass, device_reg):
"""Test we get the expected triggers from a lutron pico."""
config_entry_id = await _async_setup_lutron_with_picos(hass, device_reg)
config_entry_id = await _async_setup_lutron_with_picos(hass)
data: LutronCasetaData = hass.data[DOMAIN][config_entry_id]
dr_button_devices = data.button_devices
device_id = list(dr_button_devices)[0]
keypads = data.keypad_data.keypads
device_id = keypads[list(keypads)[0]]["dr_device_id"]
expected_triggers = [
{
@ -167,7 +174,7 @@ async def test_get_triggers(hass, device_reg):
async def test_get_triggers_for_invalid_device_id(hass, device_reg):
"""Test error raised for invalid lutron device_id."""
config_entry_id = await _async_setup_lutron_with_picos(hass, device_reg)
config_entry_id = await _async_setup_lutron_with_picos(hass)
invalid_device = device_reg.async_get_or_create(
config_entry_id=config_entry_id,
@ -183,7 +190,7 @@ async def test_get_triggers_for_invalid_device_id(hass, device_reg):
async def test_get_triggers_for_non_button_device(hass, device_reg):
"""Test error raised for invalid lutron device_id."""
config_entry_id = await _async_setup_lutron_with_picos(hass, device_reg)
config_entry_id = await _async_setup_lutron_with_picos(hass)
invalid_device = device_reg.async_get_or_create(
config_entry_id=config_entry_id,
@ -199,7 +206,7 @@ async def test_get_triggers_for_non_button_device(hass, device_reg):
async def test_none_serial_keypad(hass, device_reg):
"""Test serial assignment for keypads without serials."""
config_entry_id = await _async_setup_lutron_with_picos(hass, device_reg)
config_entry_id = await _async_setup_lutron_with_picos(hass)
keypad_device = device_reg.async_get_or_create(
config_entry_id=config_entry_id,
@ -211,11 +218,13 @@ async def test_none_serial_keypad(hass, device_reg):
async def test_if_fires_on_button_event(hass, calls, device_reg):
"""Test for press trigger firing."""
await _async_setup_lutron_with_picos(hass, device_reg)
await _async_setup_lutron_with_picos(hass)
device = MOCK_BUTTON_DEVICES[0]
dr = device_registry.async_get(hass)
dr_device = dr.async_get_device(identifiers={(DOMAIN, device["serial"])})
device_id = dr_device.id
assert await async_setup_component(
hass,
automation.DOMAIN,
@ -255,7 +264,7 @@ async def test_if_fires_on_button_event(hass, calls, device_reg):
async def test_if_fires_on_button_event_without_lip(hass, calls, device_reg):
"""Test for press trigger firing on a device that does not support lip."""
await _async_setup_lutron_with_picos(hass, device_reg)
await _async_setup_lutron_with_picos(hass)
device = MOCK_BUTTON_DEVICES[1]
dr = device_registry.async_get(hass)
dr_device = dr.async_get_device(identifiers={(DOMAIN, device["serial"])})
@ -271,7 +280,7 @@ async def test_if_fires_on_button_event_without_lip(hass, calls, device_reg):
CONF_DOMAIN: DOMAIN,
CONF_DEVICE_ID: device_id,
CONF_TYPE: "press",
CONF_SUBTYPE: "button_1",
CONF_SUBTYPE: "Kitchen Pendants",
},
"action": {
"service": "test.automation",
@ -285,7 +294,7 @@ async def test_if_fires_on_button_event_without_lip(hass, calls, device_reg):
message = {
ATTR_SERIAL: device.get("serial"),
ATTR_TYPE: device.get("type"),
ATTR_LEAP_BUTTON_NUMBER: 1,
ATTR_LEAP_BUTTON_NUMBER: 3,
ATTR_DEVICE_NAME: device["Name"],
ATTR_AREA_NAME: device.get("Area", {}).get("Name"),
ATTR_ACTION: "press",
@ -338,12 +347,13 @@ async def test_validate_trigger_config_no_device(hass, calls, device_reg):
async def test_validate_trigger_config_unknown_device(hass, calls, device_reg):
"""Test for no press with an unknown device."""
config_entry_id = await _async_setup_lutron_with_picos(hass, device_reg)
config_entry_id = await _async_setup_lutron_with_picos(hass)
data: LutronCasetaData = hass.data[DOMAIN][config_entry_id]
dr_button_devices = data.button_devices
device_id = list(dr_button_devices)[0]
device = dr_button_devices[device_id]
device["type"] = "unknown"
keypads = data.keypad_data.keypads
lutron_device_id = list(keypads)[0]
keypad = keypads[lutron_device_id]
device_id = keypad["dr_device_id"]
keypad["type"] = "unknown"
assert await async_setup_component(
hass,
@ -382,10 +392,13 @@ async def test_validate_trigger_config_unknown_device(hass, calls, device_reg):
async def test_validate_trigger_invalid_triggers(hass, device_reg):
"""Test for click_event with invalid triggers."""
config_entry_id = await _async_setup_lutron_with_picos(hass, device_reg)
config_entry_id = await _async_setup_lutron_with_picos(hass)
data: LutronCasetaData = hass.data[DOMAIN][config_entry_id]
dr_button_devices = data.button_devices
device_id = list(dr_button_devices)[0]
keypads = data.keypad_data.keypads
lutron_device_id = list(keypads)[0]
keypad = keypads[lutron_device_id]
device_id = keypad["dr_device_id"]
assert await async_setup_component(
hass,
automation.DOMAIN,

View File

@ -1,6 +1,6 @@
"""Test the Lutron Caseta diagnostics."""
from unittest.mock import patch
from unittest.mock import ANY, patch
from homeassistant.components.lutron_caseta import DOMAIN
from homeassistant.components.lutron_caseta.const import (
@ -39,7 +39,7 @@ async def test_diagnostics(hass, hass_client) -> None:
diag = await get_diagnostics_for_config_entry(hass, hass_client, config_entry)
assert diag == {
"data": {
"bridge_data": {
"areas": {
"898": {"id": "898", "name": "Basement", "parent_id": None},
"822": {"id": "822", "name": "Bedroom", "parent_id": "898"},
@ -53,7 +53,7 @@ async def test_diagnostics(hass, hass_client) -> None:
"111": {
"device_id": "111",
"current_state": "Release",
"button_number": 0,
"button_number": 1,
"name": "Dining Room_Pico",
"type": "Pico3ButtonRaiseLower",
"model": "PJ2-3BRL-GXX-X01",
@ -185,4 +185,66 @@ async def test_diagnostics(hass, hass_client) -> None:
"data": {"ca_certs": "", "certfile": "", "host": "1.1.1.1", "keyfile": ""},
"title": "Mock Title",
},
"integration_data": {
"keypad_button_names_to_leap": {
"1355": {"Kitchen Pendants": 3},
"9": {"Stop": 1},
},
"keypad_buttons": {
"111": {
"button_name": "Stop",
"leap_button_number": 1,
"led_device_id": None,
"lutron_device_id": "111",
"parent_keypad": "9",
},
"1372": {
"button_name": "Kitchen " "Pendants",
"leap_button_number": 3,
"led_device_id": "1362",
"lutron_device_id": "1372",
"parent_keypad": "1355",
},
},
"keypads": {
"1355": {
"area_id": "1205",
"area_name": "Hallway",
"buttons": ["1372"],
"device_info": {
"identifiers": [["lutron_caseta", 66286451]],
"manufacturer": "Lutron " "Electronics " "Co., " "Inc",
"model": "RRST-W3RL-XX " "(SunnataKeypad)",
"name": "Hallway " "Main " "Stairs " "Position 1 " "Keypad",
"suggested_area": "Hallway",
"via_device": ["lutron_caseta", 1234],
},
"dr_device_id": ANY,
"lutron_device_id": "1355",
"model": "RRST-W3RL-XX",
"name": "Main Stairs Position 1 " "Keypad",
"serial": 66286451,
"type": "SunnataKeypad",
},
"9": {
"area_id": "1026",
"area_name": "Dining Room",
"buttons": ["111"],
"device_info": {
"identifiers": [["lutron_caseta", 68551522]],
"manufacturer": "Lutron " "Electronics " "Co., " "Inc",
"model": "PJ2-3BRL-GXX-X01 " "(Pico3ButtonRaiseLower)",
"name": "Dining Room " "Pico",
"suggested_area": "Dining " "Room",
"via_device": ["lutron_caseta", 1234],
},
"dr_device_id": ANY,
"lutron_device_id": "9",
"model": "PJ2-3BRL-GXX-X01",
"name": "Pico",
"serial": 68551522,
"type": "Pico3ButtonRaiseLower",
},
},
},
}

View File

@ -15,6 +15,7 @@ from homeassistant.components.lutron_caseta.const import (
DOMAIN,
LUTRON_CASETA_BUTTON_EVENT,
)
from homeassistant.components.lutron_caseta.models import LutronCasetaData
from homeassistant.const import ATTR_DEVICE_ID, CONF_HOST
from homeassistant.setup import async_setup_component
@ -49,25 +50,30 @@ async def test_humanify_lutron_caseta_button_event(hass):
await hass.async_block_till_done()
data: LutronCasetaData = hass.data[DOMAIN][config_entry.entry_id]
keypads = data.keypad_data.keypads
keypad = keypads["9"]
dr_device_id = keypad["dr_device_id"]
(event1,) = mock_humanify(
hass,
[
MockRow(
LUTRON_CASETA_BUTTON_EVENT,
{
ATTR_SERIAL: "123",
ATTR_DEVICE_ID: "1234",
ATTR_SERIAL: "68551522",
ATTR_DEVICE_ID: dr_device_id,
ATTR_TYPE: "Pico3ButtonRaiseLower",
ATTR_LEAP_BUTTON_NUMBER: 3,
ATTR_BUTTON_NUMBER: 3,
ATTR_LEAP_BUTTON_NUMBER: 1,
ATTR_BUTTON_NUMBER: 1,
ATTR_DEVICE_NAME: "Pico",
ATTR_AREA_NAME: "Living Room",
ATTR_AREA_NAME: "Dining Room",
ATTR_ACTION: "press",
},
),
],
)
assert event1["name"] == "Living Room Pico"
assert event1["name"] == "Dining Room Pico"
assert event1["domain"] == DOMAIN
assert event1["message"] == "press raise"
assert event1["message"] == "press stop"