Add service homeworks.send_command (#114059)

* Add service homeworks.send_command

* Translate exception
This commit is contained in:
Erik Montnemery 2024-04-03 09:10:10 +02:00 committed by GitHub
parent adbaed2c6d
commit d058615961
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 190 additions and 2 deletions

View File

@ -2,8 +2,10 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
from dataclasses import dataclass from dataclasses import dataclass
import logging import logging
from time import sleep
from typing import Any from typing import Any
from pyhomeworks.pyhomeworks import HW_BUTTON_PRESSED, HW_BUTTON_RELEASED, Homeworks from pyhomeworks.pyhomeworks import HW_BUTTON_PRESSED, HW_BUTTON_RELEASED, Homeworks
@ -18,8 +20,8 @@ from homeassistant.const import (
EVENT_HOMEASSISTANT_STOP, EVENT_HOMEASSISTANT_STOP,
Platform, Platform,
) )
from homeassistant.core import Event, HomeAssistant, callback from homeassistant.core import Event, HomeAssistant, ServiceCall, callback
from homeassistant.exceptions import ConfigEntryNotReady from homeassistant.exceptions import ConfigEntryNotReady, ServiceValidationError
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.debounce import Debouncer from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.dispatcher import async_dispatcher_connect, dispatcher_send from homeassistant.helpers.dispatcher import async_dispatcher_connect, dispatcher_send
@ -40,6 +42,8 @@ _LOGGER = logging.getLogger(__name__)
PLATFORMS: list[Platform] = [Platform.BINARY_SENSOR, Platform.BUTTON, Platform.LIGHT] PLATFORMS: list[Platform] = [Platform.BINARY_SENSOR, Platform.BUTTON, Platform.LIGHT]
CONF_COMMAND = "command"
EVENT_BUTTON_PRESS = "homeworks_button_press" EVENT_BUTTON_PRESS = "homeworks_button_press"
EVENT_BUTTON_RELEASE = "homeworks_button_release" EVENT_BUTTON_RELEASE = "homeworks_button_release"
@ -77,6 +81,13 @@ CONFIG_SCHEMA = vol.Schema(
extra=vol.ALLOW_EXTRA, extra=vol.ALLOW_EXTRA,
) )
SERVICE_SEND_COMMAND_SCHEMA = vol.Schema(
{
vol.Required(CONF_CONTROLLER_ID): str,
vol.Required(CONF_COMMAND): vol.All(cv.ensure_list, [str]),
}
)
@dataclass @dataclass
class HomeworksData: class HomeworksData:
@ -87,6 +98,66 @@ class HomeworksData:
keypads: dict[str, HomeworksKeypad] keypads: dict[str, HomeworksKeypad]
@callback
def async_setup_services(hass: HomeAssistant) -> None:
"""Set up services for Lutron Homeworks Series 4 and 8 integration."""
async def async_call_service(service_call: ServiceCall) -> None:
"""Call the service."""
await async_send_command(hass, service_call.data)
hass.services.async_register(
DOMAIN,
"send_command",
async_call_service,
schema=SERVICE_SEND_COMMAND_SCHEMA,
)
async def async_send_command(hass: HomeAssistant, data: Mapping[str, Any]) -> None:
"""Send command to a controller."""
def get_controller_ids() -> list[str]:
"""Get homeworks data for the specified controller ID."""
return [data.controller_id for data in hass.data[DOMAIN].values()]
def get_homeworks_data(controller_id: str) -> HomeworksData | None:
"""Get homeworks data for the specified controller ID."""
data: HomeworksData
for data in hass.data[DOMAIN].values():
if data.controller_id == controller_id:
return data
return None
def send_commands(controller: Homeworks, commands: list[str]) -> None:
"""Send commands to controller."""
_LOGGER.debug("Send commands: %s", commands)
for command in commands:
if command.lower().startswith("delay"):
delay = int(command.partition(" ")[2])
_LOGGER.debug("Sleeping for %s ms", delay)
sleep(delay / 1000)
else:
_LOGGER.debug("Sending command '%s'", command)
# pylint: disable-next=protected-access
controller._send(command)
homeworks_data = get_homeworks_data(data[CONF_CONTROLLER_ID])
if not homeworks_data:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="invalid_controller_id",
translation_placeholders={
"controller_id": data[CONF_CONTROLLER_ID],
"controller_ids": ",".join(get_controller_ids()),
},
)
await hass.async_add_executor_job(
send_commands, homeworks_data.controller, data[CONF_COMMAND]
)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Start Homeworks controller.""" """Start Homeworks controller."""
@ -97,6 +168,8 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
) )
) )
async_setup_services(hass)
return True return True

View File

@ -0,0 +1,5 @@
{
"services": {
"send_command": "mdi:console"
}
}

View File

@ -0,0 +1,13 @@
send_command:
fields:
controller_id:
required: true
example: "lutron_homeworks"
selector:
text:
command:
required: true
example: "KBP, [02:08:02:01], 1"
selector:
text:
multiple: true

View File

@ -39,6 +39,11 @@
} }
} }
}, },
"exceptions": {
"invalid_controller_id": {
"message": "Invalid controller_id '{controller_id}', expected one of '{controller_ids}'"
}
},
"options": { "options": {
"error": { "error": {
"duplicated_addr": "The specified address is already in use", "duplicated_addr": "The specified address is already in use",
@ -142,5 +147,21 @@
"title": "[%key:component::homeworks::options::step::init::menu_options::select_edit_light%]" "title": "[%key:component::homeworks::options::step::init::menu_options::select_edit_light%]"
} }
} }
},
"services": {
"send_command": {
"name": "Send command",
"description": "Send custom command to a controller",
"fields": {
"command": {
"name": "Command",
"description": "Command to send to the controller. This can either be a single command or a list of commands."
},
"controller_id": {
"name": "Controller ID",
"description": "The controller to which to send command."
}
}
}
} }
} }

View File

@ -3,12 +3,14 @@
from unittest.mock import ANY, MagicMock from unittest.mock import ANY, MagicMock
from pyhomeworks.pyhomeworks import HW_BUTTON_PRESSED, HW_BUTTON_RELEASED from pyhomeworks.pyhomeworks import HW_BUTTON_PRESSED, HW_BUTTON_RELEASED
import pytest
from homeassistant.components.homeworks import EVENT_BUTTON_PRESS, EVENT_BUTTON_RELEASE from homeassistant.components.homeworks import EVENT_BUTTON_PRESS, EVENT_BUTTON_RELEASE
from homeassistant.components.homeworks.const import CONF_DIMMERS, CONF_KEYPADS, DOMAIN from homeassistant.components.homeworks.const import CONF_DIMMERS, CONF_KEYPADS, DOMAIN
from homeassistant.config_entries import ConfigEntryState from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_HOST, CONF_PORT from homeassistant.const import CONF_HOST, CONF_PORT
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ServiceValidationError
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
from tests.common import MockConfigEntry, async_capture_events from tests.common import MockConfigEntry, async_capture_events
@ -114,3 +116,77 @@ async def test_keypad_events(
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(press_events) == 1 assert len(press_events) == 1
assert len(release_events) == 1 assert len(release_events) == 1
async def test_send_command(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_homeworks: MagicMock,
) -> None:
"""Test the send command service."""
mock_controller = MagicMock()
mock_homeworks.return_value = mock_controller
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
mock_controller._send.reset_mock()
await hass.services.async_call(
DOMAIN,
"send_command",
{"controller_id": "main_controller", "command": "KBP, [02:08:02:01], 1"},
blocking=True,
)
assert len(mock_controller._send.mock_calls) == 1
assert mock_controller._send.mock_calls[0][1] == ("KBP, [02:08:02:01], 1",)
mock_controller._send.reset_mock()
await hass.services.async_call(
DOMAIN,
"send_command",
{
"controller_id": "main_controller",
"command": [
"KBP, [02:08:02:01], 1",
"KBH, [02:08:02:01], 1",
"KBR, [02:08:02:01], 1",
],
},
blocking=True,
)
assert len(mock_controller._send.mock_calls) == 3
assert mock_controller._send.mock_calls[0][1] == ("KBP, [02:08:02:01], 1",)
assert mock_controller._send.mock_calls[1][1] == ("KBH, [02:08:02:01], 1",)
assert mock_controller._send.mock_calls[2][1] == ("KBR, [02:08:02:01], 1",)
mock_controller._send.reset_mock()
await hass.services.async_call(
DOMAIN,
"send_command",
{
"controller_id": "main_controller",
"command": [
"KBP, [02:08:02:01], 1",
"delay 50",
"KBH, [02:08:02:01], 1",
"dElAy 100",
"KBR, [02:08:02:01], 1",
],
},
blocking=True,
)
assert len(mock_controller._send.mock_calls) == 3
assert mock_controller._send.mock_calls[0][1] == ("KBP, [02:08:02:01], 1",)
assert mock_controller._send.mock_calls[1][1] == ("KBH, [02:08:02:01], 1",)
assert mock_controller._send.mock_calls[2][1] == ("KBR, [02:08:02:01], 1",)
mock_controller._send.reset_mock()
with pytest.raises(ServiceValidationError):
await hass.services.async_call(
DOMAIN,
"send_command",
{"controller_id": "unknown_controller", "command": "KBP, [02:08:02:01], 1"},
blocking=True,
)
assert len(mock_controller._send.mock_calls) == 0