diff --git a/homeassistant/components/shelly/const.py b/homeassistant/components/shelly/const.py index e78a6f1a59d..c8fa72606d6 100644 --- a/homeassistant/components/shelly/const.py +++ b/homeassistant/components/shelly/const.py @@ -116,6 +116,10 @@ BATTERY_DEVICES_WITH_PERMANENT_CONNECTION: Final = [ # Button/Click events for Block & RPC devices EVENT_SHELLY_CLICK: Final = "shelly.click" +SHELLY_EMIT_EVENT_PATTERN: Final = re.compile( + r"(?:Shelly\s*\.\s*emitEvent\s*\(\s*[\"'`])(\w*)" +) + ATTR_CLICK_TYPE: Final = "click_type" ATTR_CHANNEL: Final = "channel" ATTR_DEVICE: Final = "device" diff --git a/homeassistant/components/shelly/event.py b/homeassistant/components/shelly/event.py index 372d73dea3c..78093bec8aa 100644 --- a/homeassistant/components/shelly/event.py +++ b/homeassistant/components/shelly/event.py @@ -6,6 +6,7 @@ from collections.abc import Callable from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Final +from aioshelly.ble.const import BLE_SCRIPT_NAME from aioshelly.block_device import Block from aioshelly.const import MODEL_I3, RPC_GENERATIONS @@ -28,10 +29,12 @@ from .const import ( from .coordinator import ShellyBlockCoordinator, ShellyConfigEntry, ShellyRpcCoordinator from .entity import ShellyBlockEntity from .utils import ( + async_remove_orphaned_entities, async_remove_shelly_entity, get_device_entry_gen, get_rpc_entity_name, get_rpc_key_instances, + get_rpc_script_event_types, is_block_momentary_input, is_rpc_momentary_input, ) @@ -68,6 +71,13 @@ RPC_EVENT: Final = ShellyRpcEventDescription( config, status, key ), ) +SCRIPT_EVENT: Final = ShellyRpcEventDescription( + key="script", + translation_key="script", + device_class=None, + entity_registry_enabled_default=False, + has_entity_name=True, +) async def async_setup_entry( @@ -95,6 +105,33 @@ async def async_setup_entry( async_remove_shelly_entity(hass, EVENT_DOMAIN, unique_id) else: entities.append(ShellyRpcEvent(coordinator, key, RPC_EVENT)) + + script_instances = get_rpc_key_instances( + coordinator.device.status, SCRIPT_EVENT.key + ) + for script in script_instances: + script_name = get_rpc_entity_name(coordinator.device, script) + if script_name == BLE_SCRIPT_NAME: + continue + + event_types = await get_rpc_script_event_types( + coordinator.device, int(script.split(":")[-1]) + ) + if not event_types: + continue + + entities.append(ShellyRpcScriptEvent(coordinator, script, event_types)) + + # If a script is removed, from the device configuration, we need to remove orphaned entities + async_remove_orphaned_entities( + hass, + config_entry.entry_id, + coordinator.mac, + EVENT_DOMAIN, + coordinator.device.status, + "script", + ) + else: coordinator = config_entry.runtime_data.block if TYPE_CHECKING: @@ -170,7 +207,7 @@ class ShellyRpcEvent(CoordinatorEntity[ShellyRpcCoordinator], EventEntity): ) -> None: """Initialize Shelly entity.""" super().__init__(coordinator) - self.input_index = int(key.split(":")[-1]) + self.event_id = int(key.split(":")[-1]) self._attr_device_info = DeviceInfo( connections={(CONNECTION_NETWORK_MAC, coordinator.mac)} ) @@ -181,6 +218,7 @@ class ShellyRpcEvent(CoordinatorEntity[ShellyRpcCoordinator], EventEntity): async def async_added_to_hass(self) -> None: """When entity is added to hass.""" await super().async_added_to_hass() + self.async_on_remove( self.coordinator.async_subscribe_input_events(self._async_handle_event) ) @@ -188,6 +226,42 @@ class ShellyRpcEvent(CoordinatorEntity[ShellyRpcCoordinator], EventEntity): @callback def _async_handle_event(self, event: dict[str, Any]) -> None: """Handle the demo button event.""" - if event["id"] == self.input_index: + if event["id"] == self.event_id: self._trigger_event(event["event"]) self.async_write_ha_state() + + +class ShellyRpcScriptEvent(ShellyRpcEvent): + """Represent RPC script event entity.""" + + def __init__( + self, + coordinator: ShellyRpcCoordinator, + key: str, + event_types: list[str], + ) -> None: + """Initialize Shelly script event entity.""" + super().__init__(coordinator, key, SCRIPT_EVENT) + + self.component = key + self._attr_event_types = event_types + + async def async_added_to_hass(self) -> None: + """When entity is added to hass.""" + await super(CoordinatorEntity, self).async_added_to_hass() + + self.async_on_remove( + self.coordinator.async_subscribe_events(self._async_handle_event) + ) + + @callback + def _async_handle_event(self, event: dict[str, Any]) -> None: + """Handle script event.""" + if event.get("component") == self.component: + event_type = event.get("event") + if event_type not in self.event_types: + # This can happen if we didn't find this event type in the script + return + + self._trigger_event(event_type, event.get("data")) + self.async_write_ha_state() diff --git a/homeassistant/components/shelly/utils.py b/homeassistant/components/shelly/utils.py index 81766c65388..fa310104424 100644 --- a/homeassistant/components/shelly/utils.py +++ b/homeassistant/components/shelly/utils.py @@ -56,6 +56,7 @@ from .const import ( RPC_INPUTS_EVENTS_TYPES, SHBTN_INPUTS_EVENTS_TYPES, SHBTN_MODELS, + SHELLY_EMIT_EVENT_PATTERN, SHIX3_1_INPUTS_EVENTS_TYPES, UPTIME_DEVIATION, VIRTUAL_COMPONENTS_MAP, @@ -598,3 +599,10 @@ def get_rpc_ws_url(hass: HomeAssistant) -> str | None: url = URL(raw_url) ws_url = url.with_scheme("wss" if url.scheme == "https" else "ws") return str(ws_url.joinpath(API_WS_URL.removeprefix("/"))) + + +async def get_rpc_script_event_types(device: RpcDevice, id: int) -> list[str]: + """Return a list of event types for a specific script.""" + code_response = await device.script_getcode(id) + matches = SHELLY_EMIT_EVENT_PATTERN.finditer(code_response["data"]) + return sorted([*{str(event_type.group(1)) for event_type in matches}]) diff --git a/tests/components/shelly/conftest.py b/tests/components/shelly/conftest.py index 2279a605403..b3074742949 100644 --- a/tests/components/shelly/conftest.py +++ b/tests/components/shelly/conftest.py @@ -2,6 +2,15 @@ from unittest.mock import AsyncMock, Mock, PropertyMock, patch +from aioshelly.ble.const import ( + BLE_CODE, + BLE_SCAN_RESULT_EVENT, + BLE_SCAN_RESULT_VERSION, + BLE_SCRIPT_NAME, + VAR_ACTIVE, + VAR_EVENT_TYPE, + VAR_VERSION, +) from aioshelly.block_device import BlockDevice, BlockUpdateType from aioshelly.const import MODEL_1, MODEL_25, MODEL_PLUS_2PM from aioshelly.rpc_device import RpcDevice, RpcUpdateType @@ -201,6 +210,9 @@ MOCK_CONFIG = { "wifi": {"sta": {"enable": True}, "sta1": {"enable": False}}, "ws": {"enable": False, "server": None}, "voltmeter:100": {"xvoltage": {"unit": "ppm"}}, + "script:1": {"id": 1, "name": "test_script.js", "enable": True}, + "script:2": {"id": 2, "name": "test_script_2.js", "enable": False}, + "script:3": {"id": 3, "name": BLE_SCRIPT_NAME, "enable": False}, } @@ -335,6 +347,15 @@ MOCK_STATUS_RPC = { "current_C": 12.3, "output": True, }, + "script:1": { + "id": 1, + "running": True, + "mem_used": 826, + "mem_peak": 1666, + "mem_free": 24360, + }, + "script:2": {"id": 2, "running": False}, + "script:3": {"id": 3, "running": False}, "humidity:0": {"rh": 44.4}, "sys": { "available_updates": { @@ -347,6 +368,28 @@ MOCK_STATUS_RPC = { "wifi": {"rssi": -63}, } +MOCK_SCRIPTS = [ + """" +function eventHandler(event, userdata) { + if (typeof event.component !== "string") + return; + + let component = event.component.substring(0, 5); + if (component === "input") { + let id = Number(event.component.substring(6)); + Shelly.emitEvent("input_event", { id: id }); + } +} + +Shelly.addEventHandler(eventHandler); +Shelly.emitEvent("script_start"); +""", + 'console.log("Hello World!")', + BLE_CODE.replace(VAR_ACTIVE, "true") + .replace(VAR_EVENT_TYPE, BLE_SCAN_RESULT_EVENT) + .replace(VAR_VERSION, str(BLE_SCAN_RESULT_VERSION)), +] + @pytest.fixture(autouse=True) def mock_coap(): @@ -430,6 +473,9 @@ def _mock_rpc_device(version: str | None = None): firmware_version="some fw string", initialized=True, connected=True, + script_getcode=AsyncMock( + side_effect=lambda script_id: {"data": MOCK_SCRIPTS[script_id - 1]} + ), ) type(device).name = PropertyMock(return_value="Test name") return device diff --git a/tests/components/shelly/snapshots/test_event.ambr b/tests/components/shelly/snapshots/test_event.ambr new file mode 100644 index 00000000000..51129b7e249 --- /dev/null +++ b/tests/components/shelly/snapshots/test_event.ambr @@ -0,0 +1,69 @@ +# serializer version: 1 +# name: test_rpc_script_1_event[event.test_name_test_script_js-entry] + EntityRegistryEntrySnapshot({ + 'aliases': set({ + }), + 'area_id': None, + 'capabilities': dict({ + 'event_types': list([ + 'input_event', + 'script_start', + ]), + }), + 'config_entry_id': , + 'device_class': None, + 'device_id': , + 'disabled_by': None, + 'domain': 'event', + 'entity_category': None, + 'entity_id': 'event.test_name_test_script_js', + 'has_entity_name': True, + 'hidden_by': None, + 'icon': None, + 'id': , + 'labels': set({ + }), + 'name': None, + 'options': dict({ + }), + 'original_device_class': None, + 'original_icon': None, + 'original_name': 'test_script.js', + 'platform': 'shelly', + 'previous_unique_id': None, + 'supported_features': 0, + 'translation_key': 'script', + 'unique_id': '123456789ABC-script:1', + 'unit_of_measurement': None, + }) +# --- +# name: test_rpc_script_1_event[event.test_name_test_script_js-state] + StateSnapshot({ + 'attributes': ReadOnlyDict({ + 'event_type': None, + 'event_types': list([ + 'input_event', + 'script_start', + ]), + 'friendly_name': 'Test name test_script.js', + }), + 'context': , + 'entity_id': 'event.test_name_test_script_js', + 'last_changed': , + 'last_reported': , + 'last_updated': , + 'state': 'unknown', + }) +# --- +# name: test_rpc_script_2_event[event.test_name_test_script_2_js-entry] + None +# --- +# name: test_rpc_script_2_event[event.test_name_test_script_2_js-state] + None +# --- +# name: test_rpc_script_ble_event[event.test_name_aioshelly_ble_integration-entry] + None +# --- +# name: test_rpc_script_ble_event[event.test_name_aioshelly_ble_integration-state] + None +# --- diff --git a/tests/components/shelly/test_event.py b/tests/components/shelly/test_event.py index 2465b016808..e184c154697 100644 --- a/tests/components/shelly/test_event.py +++ b/tests/components/shelly/test_event.py @@ -2,9 +2,11 @@ from unittest.mock import Mock +from aioshelly.ble.const import BLE_SCRIPT_NAME from aioshelly.const import MODEL_I3 import pytest from pytest_unordered import unordered +from syrupy import SnapshotAssertion from homeassistant.components.event import ( ATTR_EVENT_TYPE, @@ -64,6 +66,99 @@ async def test_rpc_button( assert state.attributes.get(ATTR_EVENT_TYPE) == "single_push" +@pytest.mark.usefixtures("entity_registry_enabled_by_default") +async def test_rpc_script_1_event( + hass: HomeAssistant, + mock_rpc_device: Mock, + entity_registry: EntityRegistry, + monkeypatch: pytest.MonkeyPatch, + snapshot: SnapshotAssertion, +) -> None: + """Test script event.""" + await init_integration(hass, 2) + entity_id = "event.test_name_test_script_js" + + state = hass.states.get(entity_id) + assert state == snapshot(name=f"{entity_id}-state") + + entry = entity_registry.async_get(entity_id) + assert entry == snapshot(name=f"{entity_id}-entry") + + inject_rpc_device_event( + monkeypatch, + mock_rpc_device, + { + "events": [ + { + "component": "script:1", + "id": 1, + "event": "script_start", + "ts": 1668522399.2, + } + ], + "ts": 1668522399.2, + }, + ) + await hass.async_block_till_done() + + state = hass.states.get(entity_id) + assert state.attributes.get(ATTR_EVENT_TYPE) == "script_start" + + inject_rpc_device_event( + monkeypatch, + mock_rpc_device, + { + "events": [ + { + "component": "script:1", + "id": 1, + "event": "unknown_event", + "ts": 1668522399.2, + } + ], + "ts": 1668522399.2, + }, + ) + await hass.async_block_till_done() + + state = hass.states.get(entity_id) + assert state.attributes.get(ATTR_EVENT_TYPE) != "unknown_event" + + +@pytest.mark.usefixtures("entity_registry_enabled_by_default") +async def test_rpc_script_2_event( + hass: HomeAssistant, + entity_registry: EntityRegistry, + snapshot: SnapshotAssertion, +) -> None: + """Test that scripts without any emitEvent will not get an event entity.""" + await init_integration(hass, 2) + entity_id = "event.test_name_test_script_2_js" + + state = hass.states.get(entity_id) + assert state == snapshot(name=f"{entity_id}-state") + + entry = entity_registry.async_get(entity_id) + assert entry == snapshot(name=f"{entity_id}-entry") + + +@pytest.mark.usefixtures("entity_registry_enabled_by_default") +async def test_rpc_script_ble_event( + hass: HomeAssistant, + entity_registry: EntityRegistry, + snapshot: SnapshotAssertion, +) -> None: + """Test that the ble script will not get an event entity.""" + await init_integration(hass, 2) + entity_id = f"event.test_name_{BLE_SCRIPT_NAME}" + + state = hass.states.get(entity_id) + assert state == snapshot(name=f"{entity_id}-state") + + entry = entity_registry.async_get(entity_id) + assert entry == snapshot(name=f"{entity_id}-entry") + + async def test_rpc_event_removal( hass: HomeAssistant, mock_rpc_device: Mock,