KNX services send and event_register accept multiple group addresses (#46908)

* send and event_register service accept lists of group addresses

* remove lambda

* object selector for lists

* knx.read takes lists too
This commit is contained in:
Matthias Alphart 2021-03-01 11:51:59 +01:00 committed by GitHub
parent 4c42e469b3
commit 92afcb6b4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 40 deletions

View File

@ -1,6 +1,7 @@
"""Support KNX devices."""
import asyncio
import logging
from typing import Union
import voluptuous as vol
from xknx import XKNX
@ -148,7 +149,10 @@ CONFIG_SCHEMA = vol.Schema(
SERVICE_KNX_SEND_SCHEMA = vol.Any(
vol.Schema(
{
vol.Required(CONF_ADDRESS): ga_validator,
vol.Required(CONF_ADDRESS): vol.All(
cv.ensure_list,
[ga_validator],
),
vol.Required(SERVICE_KNX_ATTR_PAYLOAD): cv.match_all,
vol.Required(SERVICE_KNX_ATTR_TYPE): vol.Any(int, float, str),
}
@ -156,7 +160,10 @@ SERVICE_KNX_SEND_SCHEMA = vol.Any(
vol.Schema(
# without type given payload is treated as raw bytes
{
vol.Required(CONF_ADDRESS): ga_validator,
vol.Required(CONF_ADDRESS): vol.All(
cv.ensure_list,
[ga_validator],
),
vol.Required(SERVICE_KNX_ATTR_PAYLOAD): vol.Any(
cv.positive_int, [cv.positive_int]
),
@ -175,7 +182,10 @@ SERVICE_KNX_READ_SCHEMA = vol.Schema(
SERVICE_KNX_EVENT_REGISTER_SCHEMA = vol.Schema(
{
vol.Required(CONF_ADDRESS): ga_validator,
vol.Required(CONF_ADDRESS): vol.All(
cv.ensure_list,
[ga_validator],
),
vol.Optional(SERVICE_KNX_ATTR_REMOVE, default=False): cv.boolean,
}
)
@ -401,21 +411,26 @@ class KNXModule:
async def service_event_register_modify(self, call):
"""Service for adding or removing a GroupAddress to the knx_event filter."""
group_address = GroupAddress(call.data[CONF_ADDRESS])
attr_address = call.data.get(CONF_ADDRESS)
group_addresses = map(GroupAddress, attr_address)
if call.data.get(SERVICE_KNX_ATTR_REMOVE):
try:
self._knx_event_callback.group_addresses.remove(group_address)
except ValueError:
_LOGGER.warning(
"Service event_register could not remove event for '%s'",
group_address,
)
elif group_address not in self._knx_event_callback.group_addresses:
self._knx_event_callback.group_addresses.append(group_address)
_LOGGER.debug(
"Service event_register registered event for '%s'",
group_address,
)
for group_address in group_addresses:
try:
self._knx_event_callback.group_addresses.remove(group_address)
except ValueError:
_LOGGER.warning(
"Service event_register could not remove event for '%s'",
str(group_address),
)
else:
for group_address in group_addresses:
if group_address not in self._knx_event_callback.group_addresses:
self._knx_event_callback.group_addresses.append(group_address)
_LOGGER.debug(
"Service event_register registered event for '%s'",
str(group_address),
)
async def service_exposure_register_modify(self, call):
"""Service for adding or removing an exposure to KNX bus."""
@ -450,26 +465,27 @@ class KNXModule:
async def service_send_to_knx_bus(self, call):
"""Service for sending an arbitrary KNX message to the KNX bus."""
attr_payload = call.data.get(SERVICE_KNX_ATTR_PAYLOAD)
attr_address = call.data.get(CONF_ADDRESS)
attr_payload = call.data.get(SERVICE_KNX_ATTR_PAYLOAD)
attr_type = call.data.get(SERVICE_KNX_ATTR_TYPE)
def calculate_payload(attr_payload):
"""Calculate payload depending on type of attribute."""
if attr_type is not None:
transcoder = DPTBase.parse_transcoder(attr_type)
if transcoder is None:
raise ValueError(f"Invalid type for knx.send service: {attr_type}")
return DPTArray(transcoder.to_knx(attr_payload))
if isinstance(attr_payload, int):
return DPTBinary(attr_payload)
return DPTArray(attr_payload)
payload: Union[DPTBinary, DPTArray]
if attr_type is not None:
transcoder = DPTBase.parse_transcoder(attr_type)
if transcoder is None:
raise ValueError(f"Invalid type for knx.send service: {attr_type}")
payload = DPTArray(transcoder.to_knx(attr_payload))
elif isinstance(attr_payload, int):
payload = DPTBinary(attr_payload)
else:
payload = DPTArray(attr_payload)
telegram = Telegram(
destination_address=GroupAddress(attr_address),
payload=GroupValueWrite(calculate_payload(attr_payload)),
)
await self.xknx.telegrams.put(telegram)
for address in attr_address:
telegram = Telegram(
destination_address=GroupAddress(address),
payload=GroupValueWrite(payload),
)
await self.xknx.telegrams.put(telegram)
async def service_read_to_knx_bus(self, call):
"""Service for sending a GroupValueRead telegram to the KNX bus."""

View File

@ -4,11 +4,11 @@ send:
fields:
address:
name: "Group address"
description: "Group address(es) to write to."
description: "Group address(es) to write to. Lists will send to multiple group addresses successively."
required: true
example: "1/1/0"
selector:
text:
object:
payload:
name: "Payload"
description: "Payload to send to the bus. Integers are treated as DPT 1/2/3 payloads. For DPTs > 6 bits send a list. Each value represents 1 octet (0-255). Pad with 0 to DPT byte length."
@ -33,21 +33,21 @@ read:
required: true
example: "1/1/0"
selector:
text:
object:
event_register:
name: "Register knx_event"
description: "Add or remove single group address to knx_event filter for triggering `knx_event`s. Only addresses added with this service can be removed."
description: "Add or remove group addresses to knx_event filter for triggering `knx_event`s. Only addresses added with this service can be removed."
fields:
address:
name: "Group address"
description: "Group address that shall be added or removed."
description: "Group address(es) that shall be added or removed. Lists are allowed."
required: true
example: "1/1/0"
selector:
text:
object:
remove:
name: "Remove event registration"
description: "Optional. If `True` the group address will be removed."
description: "Optional. If `True` the group address(es) will be removed."
default: false
selector:
boolean: