Add KNX select entity (#52026)

* select entity for knx

* validate select options

* lint

* phytonify

* Tweak

Co-authored-by: Franck Nijhof <frenck@frenck.nl>
This commit is contained in:
Matthias Alphart 2021-06-24 10:54:04 +02:00 committed by GitHub
parent aa56a21b45
commit 74db49fae4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 169 additions and 1 deletions

View File

@ -51,6 +51,7 @@ from .schema import (
NotifySchema,
NumberSchema,
SceneSchema,
SelectSchema,
SensorSchema,
SwitchSchema,
WeatherSchema,
@ -96,6 +97,7 @@ CONFIG_SCHEMA = vol.Schema(
**NotifySchema.platform_node(),
**NumberSchema.platform_node(),
**SceneSchema.platform_node(),
**SelectSchema.platform_node(),
**SensorSchema.platform_node(),
**SwitchSchema.platform_node(),
**WeatherSchema.platform_node(),

View File

@ -32,7 +32,7 @@ CONF_KNX_INDIVIDUAL_ADDRESS: Final = "individual_address"
CONF_KNX_ROUTING: Final = "routing"
CONF_KNX_TUNNELING: Final = "tunneling"
CONF_RESET_AFTER: Final = "reset_after"
CONF_RESPOND_TO_READ = "respond_to_read"
CONF_RESPOND_TO_READ: Final = "respond_to_read"
CONF_STATE_ADDRESS: Final = "state_address"
CONF_SYNC_STATE: Final = "sync_state"
@ -59,6 +59,7 @@ class SupportedPlatforms(Enum):
NOTIFY = "notify"
NUMBER = "number"
SCENE = "scene"
SELECT = "select"
SENSOR = "sensor"
SWITCH = "switch"
WEATHER = "weather"

View File

@ -116,6 +116,33 @@ def numeric_type_validator(value: Any) -> str | int:
raise vol.Invalid(f"value '{value}' is not a valid numeric sensor type.")
def select_options_sub_validator(entity_config: OrderedDict) -> OrderedDict:
"""Validate a select entity options configuration."""
options_seen = set()
payloads_seen = set()
payload_length = entity_config[SelectSchema.CONF_PAYLOAD_LENGTH]
if payload_length == 0:
max_payload = 0x3F
else:
max_payload = 256 ** payload_length - 1
for opt in entity_config[SelectSchema.CONF_OPTIONS]:
option = opt[SelectSchema.CONF_OPTION]
payload = opt[SelectSchema.CONF_PAYLOAD]
if payload > max_payload:
raise vol.Invalid(
f"'payload: {payload}' for 'option: {option}' exceeds possible"
f" maximum of 'payload_length: {payload_length}': {max_payload}"
)
if option in options_seen:
raise vol.Invalid(f"duplicate item for 'option' not allowed: {option}")
options_seen.add(option)
if payload in payloads_seen:
raise vol.Invalid(f"duplicate item for 'payload' not allowed: {payload}")
payloads_seen.add(payload)
return entity_config
def sensor_type_validator(value: Any) -> str | int:
"""Validate that value is parsable as sensor type."""
if isinstance(value, (str, int)) and DPTBase.parse_transcoder(value) is not None:
@ -617,6 +644,40 @@ class SceneSchema(KNXPlatformSchema):
)
class SelectSchema(KNXPlatformSchema):
"""Voluptuous schema for KNX selects."""
PLATFORM_NAME = SupportedPlatforms.SELECT.value
CONF_OPTION = "option"
CONF_OPTIONS = "options"
CONF_PAYLOAD = "payload"
CONF_PAYLOAD_LENGTH = "payload_length"
DEFAULT_NAME = "KNX Select"
ENTITY_SCHEMA = vol.All(
vol.Schema(
{
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_SYNC_STATE, default=True): sync_state_validator,
vol.Optional(CONF_RESPOND_TO_READ, default=False): cv.boolean,
vol.Required(CONF_PAYLOAD_LENGTH): vol.All(
vol.Coerce(int), vol.Range(min=0, max=14)
),
vol.Required(CONF_OPTIONS): [
{
vol.Required(CONF_OPTION): vol.Coerce(str),
vol.Required(CONF_PAYLOAD): cv.positive_int,
}
],
vol.Required(KNX_ADDRESS): ga_list_validator,
vol.Optional(CONF_STATE_ADDRESS): ga_list_validator,
}
),
select_options_sub_validator,
)
class SensorSchema(KNXPlatformSchema):
"""Voluptuous schema for KNX sensors."""

View File

@ -0,0 +1,104 @@
"""Support for KNX/IP select entities."""
from __future__ import annotations
from xknx import XKNX
from xknx.devices import Device as XknxDevice, RawValue
from homeassistant.components.select import SelectEntity
from homeassistant.const import CONF_NAME, STATE_UNAVAILABLE, STATE_UNKNOWN
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import (
CONF_RESPOND_TO_READ,
CONF_STATE_ADDRESS,
CONF_SYNC_STATE,
DOMAIN,
KNX_ADDRESS,
)
from .knx_entity import KnxEntity
from .schema import SelectSchema
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up number entities for KNX platform."""
if not discovery_info or not discovery_info["platform_config"]:
return
platform_config = discovery_info["platform_config"]
xknx: XKNX = hass.data[DOMAIN].xknx
async_add_entities(
KNXSelect(xknx, entity_config) for entity_config in platform_config
)
def _create_raw_value(xknx: XKNX, config: ConfigType) -> RawValue:
"""Return a KNX RawValue to be used within XKNX."""
return RawValue(
xknx,
name=config[CONF_NAME],
payload_length=config[SelectSchema.CONF_PAYLOAD_LENGTH],
group_address=config[KNX_ADDRESS],
group_address_state=config.get(CONF_STATE_ADDRESS),
respond_to_read=config[CONF_RESPOND_TO_READ],
sync_state=config[CONF_SYNC_STATE],
)
class KNXSelect(KnxEntity, SelectEntity, RestoreEntity):
"""Representation of a KNX number."""
def __init__(self, xknx: XKNX, config: ConfigType) -> None:
"""Initialize a KNX number."""
self._device: RawValue
super().__init__(_create_raw_value(xknx, config))
self._unique_id = f"{self._device.remote_value.group_address}"
self._option_payloads: dict[str, int] = {
option[SelectSchema.CONF_OPTION]: option[SelectSchema.CONF_PAYLOAD]
for option in config[SelectSchema.CONF_OPTIONS]
}
self._attr_options = list(self._option_payloads)
self._attr_current_option = None
async def async_added_to_hass(self) -> None:
"""Restore last state."""
await super().async_added_to_hass()
if not self._device.remote_value.readable and (
last_state := await self.async_get_last_state()
):
if last_state.state not in (STATE_UNKNOWN, STATE_UNAVAILABLE):
await self._device.remote_value.update_value(
self._option_payloads.get(last_state.state)
)
async def after_update_callback(self, device: XknxDevice) -> None:
"""Call after device was updated."""
self._attr_current_option = self.option_from_payload(
self._device.remote_value.value
)
await super().after_update_callback(device)
def option_from_payload(self, payload: int | None) -> str | None:
"""Return the option a given payload is assigned to."""
try:
return next(
key for key, value in self._option_payloads.items() if value == payload
)
except StopIteration:
return None
async def async_select_option(self, option: str) -> None:
"""Change the selected option."""
if payload := self._option_payloads.get(option):
await self._device.set(payload)
return
raise ValueError(f"Invalid option for {self.entity_id}: {option}")