Add rfxtrx ability to send a raw command to device (#37793)

* Add the ability to send a raw command to device

* Add a test for new service

* Use async test

* Sort includes
This commit is contained in:
Joakim Plate 2020-07-13 16:54:52 +02:00 committed by GitHub
parent dbcd5f4c2c
commit ccbc3b5e39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 1 deletions

View File

@ -26,7 +26,12 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.discovery import load_platform
from homeassistant.helpers.entity import Entity
from .const import DEVICE_PACKET_TYPE_LIGHTING4, EVENT_RFXTRX_EVENT
from .const import (
ATTR_EVENT,
DEVICE_PACKET_TYPE_LIGHTING4,
EVENT_RFXTRX_EVENT,
SERVICE_SEND,
)
DOMAIN = "rfxtrx"
@ -78,6 +83,16 @@ _LOGGER = logging.getLogger(__name__)
DATA_RFXOBJECT = "rfxobject"
def _bytearray_string(data):
val = cv.string(data)
try:
return bytearray.fromhex(val)
except ValueError:
raise vol.Invalid("Data must be a hex string with multiple of two characters")
SERVICE_SEND_SCHEMA = vol.Schema({ATTR_EVENT: _bytearray_string})
DEVICE_SCHEMA = vol.Schema(
{
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
@ -187,6 +202,12 @@ def setup(hass, config):
load_platform(hass, "binary_sensor", DOMAIN, config[DOMAIN], config)
load_platform(hass, "sensor", DOMAIN, config[DOMAIN], config)
def send(call):
event = call.data[ATTR_EVENT]
rfx_object.transport.send(event)
hass.services.async_register(DOMAIN, SERVICE_SEND, send, schema=SERVICE_SEND_SCHEMA)
return True

View File

@ -15,6 +15,10 @@ COMMAND_OFF_LIST = [
"Close (inline relay)",
]
ATTR_EVENT = "event"
SERVICE_SEND = "send"
DEVICE_PACKET_TYPE_LIGHTING4 = 0x13
EVENT_RFXTRX_EVENT = "rfxtrx_event"

View File

@ -0,0 +1,6 @@
send:
description: Sends a raw event on radio.
fields:
event:
description: A hexadecimal string to send.
example: "0b11009e00e6116202020070"

View File

@ -6,6 +6,8 @@ from homeassistant.setup import async_setup_component
from . import _signal_event
from tests.async_mock import call
async def test_valid_config(hass):
"""Test configuration."""
@ -104,3 +106,19 @@ async def test_fire_event(hass):
"values": {"Sound": 9, "Battery numeric": 0, "Rssi numeric": 7},
},
]
async def test_send(hass, rfxtrx):
"""Test configuration."""
assert await async_setup_component(
hass, "rfxtrx", {"rfxtrx": {"device": "/dev/null"}}
)
await hass.async_block_till_done()
await hass.services.async_call(
"rfxtrx", "send", {"event": "0a520802060101ff0f0269"}, blocking=True
)
assert rfxtrx.transport.send.mock_calls == [
call(bytearray(b"\x0a\x52\x08\x02\x06\x01\x01\xff\x0f\x02\x69"))
]