Add Tasmota device trigger (#41590)

* Add Tasmota device trigger

* Correct import

* Remove useless try-except

* Remove commented out code

* Align with hatasmota 0.0.14

* Update according to review comments
This commit is contained in:
Erik Montnemery 2020-10-16 08:16:07 +02:00 committed by GitHub
parent 731d617c5e
commit 9e9f841f35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1212 additions and 1 deletions

View File

@ -4,3 +4,5 @@ CONF_DISCOVERY_PREFIX = "discovery_prefix"
DEFAULT_PREFIX = "tasmota/discovery"
DOMAIN = "tasmota"
TASMOTA_EVENT = "tasmota_event"

View File

@ -0,0 +1,36 @@
"""Provides device automations for Tasmota."""
import logging
from hatasmota.const import AUTOMATION_TYPE_TRIGGER
from homeassistant.helpers.device_registry import EVENT_DEVICE_REGISTRY_UPDATED
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from . import device_trigger
from .discovery import TASMOTA_DISCOVERY_ENTITY_NEW
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass, config_entry):
"""Set up Tasmota device automation dynamically through discovery."""
async def async_device_removed(event):
"""Handle the removal of a device."""
if event.data["action"] != "remove":
return
await device_trigger.async_device_removed(hass, event.data["device_id"])
async def async_discover(tasmota_automation, discovery_hash):
"""Discover and add a Tasmota device automation."""
if tasmota_automation.automation_type == AUTOMATION_TYPE_TRIGGER:
await device_trigger.async_setup_trigger(
hass, tasmota_automation, config_entry, discovery_hash
)
async_dispatcher_connect(
hass,
TASMOTA_DISCOVERY_ENTITY_NEW.format("device_automation", "tasmota"),
async_discover,
)
hass.bus.async_listen(EVENT_DEVICE_REGISTRY_UPDATED, async_device_removed)

View File

@ -0,0 +1,291 @@
"""Provides device automations for MQTT."""
import logging
from typing import Callable, List, Optional
import attr
from hatasmota.trigger import TasmotaTrigger
import voluptuous as vol
from homeassistant.components.automation import AutomationActionType
from homeassistant.components.device_automation import TRIGGER_BASE_SCHEMA
from homeassistant.components.homeassistant.triggers import event as event_trigger
from homeassistant.const import CONF_DEVICE_ID, CONF_DOMAIN, CONF_PLATFORM, CONF_TYPE
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.typing import ConfigType, HomeAssistantType
from .const import DOMAIN, TASMOTA_EVENT
from .discovery import TASMOTA_DISCOVERY_ENTITY_UPDATED, clear_discovery_hash
_LOGGER = logging.getLogger(__name__)
CONF_DISCOVERY_ID = "discovery_id"
CONF_SUBTYPE = "subtype"
DEVICE = "device"
TRIGGER_SCHEMA = TRIGGER_BASE_SCHEMA.extend(
{
vol.Required(CONF_PLATFORM): DEVICE,
vol.Required(CONF_DOMAIN): DOMAIN,
vol.Required(CONF_DEVICE_ID): str,
vol.Required(CONF_DISCOVERY_ID): str,
vol.Required(CONF_TYPE): cv.string,
vol.Required(CONF_SUBTYPE): cv.string,
}
)
DEVICE_TRIGGERS = "tasmota_device_triggers"
@attr.s(slots=True)
class TriggerInstance:
"""Attached trigger settings."""
action: AutomationActionType = attr.ib()
automation_info: dict = attr.ib()
trigger: "Trigger" = attr.ib()
remove: Optional[CALLBACK_TYPE] = attr.ib(default=None)
async def async_attach_trigger(self):
"""Attach event trigger."""
event_config = {
event_trigger.CONF_PLATFORM: "event",
event_trigger.CONF_EVENT_TYPE: TASMOTA_EVENT,
event_trigger.CONF_EVENT_DATA: {
"mac": self.trigger.tasmota_trigger.cfg.mac,
"source": self.trigger.tasmota_trigger.cfg.source,
"event": self.trigger.tasmota_trigger.cfg.event,
},
}
event_config = event_trigger.TRIGGER_SCHEMA(event_config)
if self.remove:
self.remove()
# Note: No lock needed, event_trigger.async_attach_trigger is an synchronous function
self.remove = await event_trigger.async_attach_trigger(
self.trigger.hass,
event_config,
self.action,
self.automation_info,
platform_type="device",
)
@attr.s(slots=True)
class Trigger:
"""Device trigger settings."""
device_id: str = attr.ib()
discovery_hash: dict = attr.ib()
hass: HomeAssistantType = attr.ib()
remove_update_signal: Callable[[], None] = attr.ib()
subtype: str = attr.ib()
tasmota_trigger: TasmotaTrigger = attr.ib()
type: str = attr.ib()
trigger_instances: List[TriggerInstance] = attr.ib(factory=list)
async def add_trigger(self, action, automation_info):
"""Add Tasmota trigger."""
instance = TriggerInstance(action, automation_info, self)
self.trigger_instances.append(instance)
if self.tasmota_trigger is not None:
# If we know about the trigger, set it up
await instance.async_attach_trigger()
@callback
def async_remove() -> None:
"""Remove trigger."""
if instance not in self.trigger_instances:
raise HomeAssistantError("Can't remove trigger twice")
if instance.remove:
instance.remove()
self.trigger_instances.remove(instance)
return async_remove
def detach_trigger(self):
"""Remove Tasmota device trigger."""
# Mark trigger as unknown
self.tasmota_trigger = None
# Unsubscribe if this trigger is in use
for trig in self.trigger_instances:
if trig.remove:
trig.remove()
trig.remove = None
async def arm_tasmota_trigger(self):
"""Arm Tasmota trigger: subscribe to MQTT topics and fire events."""
@callback
def _on_trigger():
data = {
"mac": self.tasmota_trigger.cfg.mac,
"source": self.tasmota_trigger.cfg.source,
"event": self.tasmota_trigger.cfg.event,
}
self.hass.bus.async_fire(
TASMOTA_EVENT,
data,
)
self.tasmota_trigger.set_on_trigger_callback(_on_trigger)
await self.tasmota_trigger.subscribe_topics()
async def set_tasmota_trigger(self, tasmota_trigger, remove_update_signal):
"""Set Tasmota trigger."""
await self.update_tasmota_trigger(tasmota_trigger.cfg, remove_update_signal)
self.tasmota_trigger = tasmota_trigger
for trig in self.trigger_instances:
await trig.async_attach_trigger()
async def update_tasmota_trigger(self, tasmota_trigger_cfg, remove_update_signal):
"""Update Tasmota trigger."""
self.remove_update_signal = remove_update_signal
self.type = tasmota_trigger_cfg.type
self.subtype = tasmota_trigger_cfg.subtype
async def async_setup_trigger(hass, tasmota_trigger, config_entry, discovery_hash):
"""Set up a discovered Tasmota device trigger."""
discovery_id = tasmota_trigger.cfg.trigger_id
remove_update_signal = None
_LOGGER.debug(
"Discovered trigger with id: %s '%s'", discovery_id, tasmota_trigger.cfg
)
async def discovery_update(trigger_config):
"""Handle discovery update."""
_LOGGER.debug(
"Got update for trigger with hash: %s '%s'", discovery_hash, trigger_config
)
if not trigger_config.is_active:
# Empty trigger_config: Remove trigger
_LOGGER.debug("Removing trigger: %s", discovery_hash)
if discovery_id in hass.data[DEVICE_TRIGGERS]:
device_trigger = hass.data[DEVICE_TRIGGERS][discovery_id]
await device_trigger.tasmota_trigger.unsubscribe_topics()
device_trigger.detach_trigger()
clear_discovery_hash(hass, discovery_hash)
remove_update_signal()
return
device_trigger = hass.data[DEVICE_TRIGGERS][discovery_id]
if device_trigger.tasmota_trigger.config_same(trigger_config):
# Unchanged payload: Ignore to avoid unnecessary unsubscribe / subscribe
_LOGGER.debug("Ignoring unchanged update for: %s", discovery_hash)
return
# Non-empty, changed trigger_config: Update trigger
_LOGGER.debug("Updating trigger: %s", discovery_hash)
device_trigger.tasmota_trigger.config_update(trigger_config)
await device_trigger.update_tasmota_trigger(
trigger_config, remove_update_signal
)
await device_trigger.arm_tasmota_trigger()
return
remove_update_signal = async_dispatcher_connect(
hass, TASMOTA_DISCOVERY_ENTITY_UPDATED.format(*discovery_hash), discovery_update
)
device_registry = await hass.helpers.device_registry.async_get_registry()
device = device_registry.async_get_device(
set(),
{(CONNECTION_NETWORK_MAC, tasmota_trigger.cfg.mac)},
)
if device is None:
return
if DEVICE_TRIGGERS not in hass.data:
hass.data[DEVICE_TRIGGERS] = {}
if discovery_id not in hass.data[DEVICE_TRIGGERS]:
device_trigger = Trigger(
hass=hass,
device_id=device.id,
discovery_hash=discovery_hash,
subtype=tasmota_trigger.cfg.subtype,
tasmota_trigger=tasmota_trigger,
type=tasmota_trigger.cfg.type,
remove_update_signal=remove_update_signal,
)
hass.data[DEVICE_TRIGGERS][discovery_id] = device_trigger
else:
# This Tasmota trigger is wanted by device trigger(s), set them up
device_trigger = hass.data[DEVICE_TRIGGERS][discovery_id]
await device_trigger.set_tasmota_trigger(tasmota_trigger, remove_update_signal)
await device_trigger.arm_tasmota_trigger()
async def async_device_removed(hass: HomeAssistant, device_id: str):
"""Handle the removal of a Tasmota device - cleanup any device triggers."""
triggers = await async_get_triggers(hass, device_id)
for trig in triggers:
device_trigger = hass.data[DEVICE_TRIGGERS].pop(trig[CONF_DISCOVERY_ID])
if device_trigger:
discovery_hash = device_trigger.discovery_hash
await device_trigger.tasmota_trigger.unsubscribe_topics()
device_trigger.detach_trigger()
clear_discovery_hash(hass, discovery_hash)
device_trigger.remove_update_signal()
async def async_get_triggers(hass: HomeAssistant, device_id: str) -> List[dict]:
"""List device triggers for Tasmota devices."""
triggers = []
if DEVICE_TRIGGERS not in hass.data:
return triggers
for discovery_id, trig in hass.data[DEVICE_TRIGGERS].items():
if trig.device_id != device_id or trig.tasmota_trigger is None:
continue
trigger = {
"platform": "device",
"domain": "tasmota",
"device_id": device_id,
"type": trig.type,
"subtype": trig.subtype,
"discovery_id": discovery_id,
}
triggers.append(trigger)
return triggers
async def async_attach_trigger(
hass: HomeAssistant,
config: ConfigType,
action: Callable,
automation_info: dict,
) -> CALLBACK_TYPE:
"""Attach a device trigger."""
if DEVICE_TRIGGERS not in hass.data:
hass.data[DEVICE_TRIGGERS] = {}
config = TRIGGER_SCHEMA(config)
device_id = config[CONF_DEVICE_ID]
discovery_id = config[CONF_DISCOVERY_ID]
if discovery_id not in hass.data[DEVICE_TRIGGERS]:
# The trigger has not (yet) been discovered, prepare it for later
hass.data[DEVICE_TRIGGERS][discovery_id] = Trigger(
hass=hass,
device_id=device_id,
discovery_hash=None,
remove_update_signal=None,
type=config[CONF_TYPE],
subtype=config[CONF_SUBTYPE],
tasmota_trigger=None,
)
return await hass.data[DEVICE_TRIGGERS][discovery_id].add_trigger(
action, automation_info
)

View File

@ -7,6 +7,8 @@ from hatasmota.discovery import (
get_device_config as tasmota_get_device_config,
get_entities_for_platform as tasmota_get_entities_for_platform,
get_entity as tasmota_get_entity,
get_trigger as tasmota_get_trigger,
get_triggers as tasmota_get_triggers,
has_entities_with_platform as tasmota_has_entities_with_platform,
unique_id_from_hash,
)
@ -110,6 +112,43 @@ async def async_start(
if not payload:
return
tasmota_triggers = tasmota_get_triggers(payload)
async with hass.data[DATA_CONFIG_ENTRY_LOCK]:
if any(trigger.is_active for trigger in tasmota_triggers):
config_entries_key = "device_automation.tasmota"
if config_entries_key not in hass.data[CONFIG_ENTRY_IS_SETUP]:
# Local import to avoid circular dependencies
# pylint: disable=import-outside-toplevel
from . import device_automation
await device_automation.async_setup_entry(hass, config_entry)
hass.data[CONFIG_ENTRY_IS_SETUP].add(config_entries_key)
for trigger_config in tasmota_triggers:
discovery_hash = (mac, "automation", "trigger", trigger_config.trigger_id)
if discovery_hash in hass.data[ALREADY_DISCOVERED]:
_LOGGER.debug(
"Trigger already added, sending update: %s",
discovery_hash,
)
async_dispatcher_send(
hass,
TASMOTA_DISCOVERY_ENTITY_UPDATED.format(*discovery_hash),
trigger_config,
)
elif trigger_config.is_active:
_LOGGER.info("Adding new trigger: %s", discovery_hash)
hass.data[ALREADY_DISCOVERED][discovery_hash] = None
tasmota_trigger = tasmota_get_trigger(trigger_config, tasmota_mqtt)
async_dispatcher_send(
hass,
TASMOTA_DISCOVERY_ENTITY_NEW.format("device_automation"),
tasmota_trigger,
discovery_hash,
)
for platform in SUPPORTED_PLATFORMS:
if not tasmota_has_entities_with_platform(payload, platform):
continue

View File

@ -10,7 +10,12 @@ from homeassistant.components.tasmota.const import (
)
from tests.async_mock import patch
from tests.common import MockConfigEntry, mock_device_registry, mock_registry
from tests.common import (
MockConfigEntry,
async_mock_service,
mock_device_registry,
mock_registry,
)
@pytest.fixture
@ -25,6 +30,12 @@ def entity_reg(hass):
return mock_registry(hass)
@pytest.fixture
def calls(hass):
"""Track calls to a mock service."""
return async_mock_service(hass, "test", "automation")
@pytest.fixture(autouse=True)
def disable_debounce():
"""Set MQTT debounce timer to zero."""

View File

@ -0,0 +1,832 @@
"""The tests for MQTT device triggers."""
import copy
import json
from hatasmota.switch import TasmotaSwitchTriggerConfig
import pytest
import homeassistant.components.automation as automation
from homeassistant.components.tasmota.const import DEFAULT_PREFIX, DOMAIN
from homeassistant.components.tasmota.device_trigger import async_attach_trigger
from homeassistant.setup import async_setup_component
from .test_common import DEFAULT_CONFIG
from tests.async_mock import patch
from tests.common import (
assert_lists_same,
async_fire_mqtt_message,
async_get_device_automations,
)
async def test_get_triggers(hass, device_reg, entity_reg, mqtt_mock, setup_tasmota):
"""Test we get the expected triggers from a discovered mqtt device."""
config = copy.deepcopy(DEFAULT_CONFIG)
config["swc"][0] = 0
mac = config["mac"]
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
expected_triggers = [
{
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_TOGGLE",
"type": "button_short_press",
"subtype": "switch_1",
},
]
triggers = await async_get_device_automations(hass, "trigger", device_entry.id)
assert_lists_same(triggers, expected_triggers)
async def test_get_unknown_triggers(
hass, device_reg, entity_reg, mqtt_mock, setup_tasmota
):
"""Test we don't get unknown triggers."""
# Discover a device without device triggers
config = copy.deepcopy(DEFAULT_CONFIG)
config["swc"][0] = -1
mac = config["mac"]
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_0_2",
"type": "button_short_press",
"subtype": "button_1",
},
"action": {
"service": "test.automation",
"data_template": {"some": ("short_press")},
},
},
]
},
)
triggers = await async_get_device_automations(hass, "trigger", device_entry.id)
assert_lists_same(triggers, [])
async def test_get_non_existing_triggers(
hass, device_reg, entity_reg, mqtt_mock, setup_tasmota
):
"""Test getting non existing triggers."""
# Discover a device without device triggers
config1 = copy.deepcopy(DEFAULT_CONFIG)
config1["swc"][0] = -1
mac = config1["mac"]
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config1))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
triggers = await async_get_device_automations(hass, "trigger", device_entry.id)
assert_lists_same(triggers, [])
@pytest.mark.no_fail_on_log_exception
async def test_discover_bad_triggers(
hass, device_reg, entity_reg, mqtt_mock, setup_tasmota
):
"""Test exception handling when discovering trigger."""
config = copy.deepcopy(DEFAULT_CONFIG)
config["swc"][0] = 0
mac = config["mac"]
# Trigger an exception when the entity is discovered
with patch(
"hatasmota.discovery.get_switch_triggers",
return_value=[object()],
):
async_fire_mqtt_message(
hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config)
)
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
triggers = await async_get_device_automations(hass, "trigger", device_entry.id)
assert_lists_same(triggers, [])
# Trigger an exception when the entity is discovered
class FakeTrigger(TasmotaSwitchTriggerConfig):
"""Bad TasmotaSwitchTriggerConfig to cause exceptions."""
@property
def is_active(self):
return True
with patch(
"hatasmota.discovery.get_switch_triggers",
return_value=[
FakeTrigger(
event=None,
idx=1,
mac=None,
source=None,
subtype=None,
trigger_topic=None,
type=None,
)
],
):
async_fire_mqtt_message(
hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config)
)
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
triggers = await async_get_device_automations(hass, "trigger", device_entry.id)
assert_lists_same(triggers, [])
# Rediscover without exception
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config))
await hass.async_block_till_done()
expected_triggers = [
{
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_TOGGLE",
"type": "button_short_press",
"subtype": "switch_1",
},
]
triggers = await async_get_device_automations(hass, "trigger", device_entry.id)
assert_lists_same(triggers, expected_triggers)
async def test_update_remove_triggers(
hass, device_reg, entity_reg, mqtt_mock, setup_tasmota
):
"""Test triggers can be updated and removed."""
# Discover a device with toggle + hold trigger
config1 = copy.deepcopy(DEFAULT_CONFIG)
config1["swc"][0] = 5
mac = config1["mac"]
# Discover a device with toggle + double press trigger
config2 = copy.deepcopy(DEFAULT_CONFIG)
config2["swc"][0] = 8
# Discover a device with no trigger
config3 = copy.deepcopy(DEFAULT_CONFIG)
config3["swc"][0] = -1
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config1))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
expected_triggers1 = [
{
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_TOGGLE",
"type": "button_short_press",
"subtype": "switch_1",
},
{
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_HOLD",
"type": "button_long_press",
"subtype": "switch_1",
},
]
expected_triggers2 = copy.deepcopy(expected_triggers1)
expected_triggers2[1]["type"] = "button_double_press"
triggers = await async_get_device_automations(hass, "trigger", device_entry.id)
for expected in expected_triggers1:
assert expected in triggers
# Update trigger
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config2))
await hass.async_block_till_done()
triggers = await async_get_device_automations(hass, "trigger", device_entry.id)
for expected in expected_triggers2:
assert expected in triggers
# Remove trigger
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config3))
await hass.async_block_till_done()
triggers = await async_get_device_automations(hass, "trigger", device_entry.id)
assert triggers == []
async def test_if_fires_on_mqtt_message(
hass, device_reg, calls, mqtt_mock, setup_tasmota
):
"""Test triggers firing."""
# Discover a device with 2 device triggers
config = copy.deepcopy(DEFAULT_CONFIG)
config["swc"][0] = 0
config["swc"][2] = 9
mac = config["mac"]
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_TOGGLE",
"type": "button_short_press",
"subtype": "switch_1",
},
"action": {
"service": "test.automation",
"data_template": {"some": ("short_press")},
},
},
{
"trigger": {
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_3_HOLD",
"subtype": "switch_3",
"type": "button_double_press",
},
"action": {
"service": "test.automation",
"data_template": {"some": ("long_press")},
},
},
]
},
)
# Fake short press.
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].data["some"] == "short_press"
# Fake long press.
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH3T", '{"TRIG":"HOLD"}')
await hass.async_block_till_done()
assert len(calls) == 2
assert calls[1].data["some"] == "long_press"
async def test_if_fires_on_mqtt_message_late_discover(
hass, device_reg, calls, mqtt_mock, setup_tasmota
):
"""Test triggers firing of MQTT device triggers discovered after setup."""
# Discover a device without device triggers
config1 = copy.deepcopy(DEFAULT_CONFIG)
config1["swc"][0] = -1
mac = config1["mac"]
# Discover a device with 2 device triggers
config2 = copy.deepcopy(DEFAULT_CONFIG)
config2["swc"][0] = 0
config2["swc"][3] = 9
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config1))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_TOGGLE",
"type": "button_short_press",
"subtype": "switch_1",
},
"action": {
"service": "test.automation",
"data_template": {"some": ("short_press")},
},
},
{
"trigger": {
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_4_HOLD",
"type": "switch_4",
"subtype": "button_double_press",
},
"action": {
"service": "test.automation",
"data_template": {"some": ("double_press")},
},
},
]
},
)
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config2))
await hass.async_block_till_done()
# Fake short press.
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].data["some"] == "short_press"
# Fake long press.
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH4T", '{"TRIG":"HOLD"}')
await hass.async_block_till_done()
assert len(calls) == 2
assert calls[1].data["some"] == "double_press"
async def test_if_fires_on_mqtt_message_after_update(
hass, device_reg, calls, mqtt_mock, setup_tasmota
):
"""Test triggers firing after update."""
# Discover a device with device trigger
config1 = copy.deepcopy(DEFAULT_CONFIG)
config2 = copy.deepcopy(DEFAULT_CONFIG)
config1["swc"][0] = 0
config2["swc"][0] = 0
config2["tp"][1] = "status"
mac = config1["mac"]
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config1))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_TOGGLE",
"type": "button_short_press",
"subtype": "switch_1",
},
"action": {
"service": "test.automation",
"data_template": {"some": ("short_press")},
},
},
]
},
)
# Fake short press.
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 1
# Update the trigger with different topic
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config2))
await hass.async_block_till_done()
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 1
async_fire_mqtt_message(hass, "tasmota_49A3BC/status/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 2
# Update the trigger with same topic
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config2))
await hass.async_block_till_done()
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 2
async_fire_mqtt_message(hass, "tasmota_49A3BC/status/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 3
async def test_no_resubscribe_same_topic(hass, device_reg, mqtt_mock, setup_tasmota):
"""Test subscription to topics without change."""
# Discover a device with device trigger
config = copy.deepcopy(DEFAULT_CONFIG)
config["swc"][0] = 0
mac = config["mac"]
mqtt_mock.async_subscribe.reset_mock()
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_TOGGLE",
"type": "button_short_press",
"subtype": "switch_1",
},
"action": {
"service": "test.automation",
"data_template": {"some": ("short_press")},
},
},
]
},
)
call_count = mqtt_mock.async_subscribe.call_count
assert call_count == 1
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config))
await hass.async_block_till_done()
assert mqtt_mock.async_subscribe.call_count == call_count
async def test_not_fires_on_mqtt_message_after_remove_by_mqtt(
hass, device_reg, calls, mqtt_mock, setup_tasmota
):
"""Test triggers not firing after removal."""
# Discover a device with device trigger
config = copy.deepcopy(DEFAULT_CONFIG)
config["swc"][0] = 0
mac = config["mac"]
mqtt_mock.async_subscribe.reset_mock()
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_TOGGLE",
"type": "button_short_press",
"subtype": "switch_1",
},
"action": {
"service": "test.automation",
"data_template": {"some": ("short_press")},
},
},
]
},
)
# Fake short press.
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 1
# Remove the trigger
config["swc"][0] = -1
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config))
await hass.async_block_till_done()
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 1
# Rediscover the trigger
config["swc"][0] = 0
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config))
await hass.async_block_till_done()
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 2
async def test_not_fires_on_mqtt_message_after_remove_from_registry(
hass, device_reg, calls, mqtt_mock, setup_tasmota
):
"""Test triggers not firing after removal."""
# Discover a device with device trigger
config = copy.deepcopy(DEFAULT_CONFIG)
config["swc"][0] = 0
mac = config["mac"]
mqtt_mock.async_subscribe.reset_mock()
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_TOGGLE",
"type": "button_short_press",
"subtype": "switch_1",
},
"action": {
"service": "test.automation",
"data_template": {"some": ("short_press")},
},
},
]
},
)
# Fake short press.
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 1
# Remove the device
device_reg.async_remove_device(device_entry.id)
await hass.async_block_till_done()
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 1
async def test_attach_remove(hass, device_reg, mqtt_mock, setup_tasmota):
"""Test attach and removal of trigger."""
# Discover a device with device trigger
config = copy.deepcopy(DEFAULT_CONFIG)
config["swc"][0] = 0
mac = config["mac"]
mqtt_mock.async_subscribe.reset_mock()
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
calls = []
def callback(trigger, context):
calls.append(trigger["trigger"]["description"])
remove = await async_attach_trigger(
hass,
{
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_TOGGLE",
"type": "button_short_press",
"subtype": "switch_1",
},
callback,
None,
)
# Fake short press.
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0] == "event 'tasmota_event'"
# Remove the trigger
remove()
await hass.async_block_till_done()
# Verify the triggers are no longer active
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 1
async def test_attach_remove_late(hass, device_reg, mqtt_mock, setup_tasmota):
"""Test attach and removal of trigger."""
# Discover a device without device triggers
config1 = copy.deepcopy(DEFAULT_CONFIG)
config1["swc"][0] = -1
mac = config1["mac"]
# Discover a device with device triggers
config2 = copy.deepcopy(DEFAULT_CONFIG)
config2["swc"][0] = 0
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config1))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
calls = []
def callback(trigger, context):
calls.append(trigger["trigger"]["description"])
remove = await async_attach_trigger(
hass,
{
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_TOGGLE",
"type": "button_short_press",
"subtype": "switch_1",
},
callback,
None,
)
# Fake short press.
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 0
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config2))
await hass.async_block_till_done()
# Fake short press.
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0] == "event 'tasmota_event'"
# Remove the trigger
remove()
await hass.async_block_till_done()
# Verify the triggers are no longer active
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 1
async def test_attach_remove_late2(hass, device_reg, mqtt_mock, setup_tasmota):
"""Test attach and removal of trigger."""
# Discover a device without device triggers
config1 = copy.deepcopy(DEFAULT_CONFIG)
config1["swc"][0] = -1
mac = config1["mac"]
# Discover a device with device triggers
config2 = copy.deepcopy(DEFAULT_CONFIG)
config2["swc"][0] = 0
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config1))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
calls = []
def callback(trigger, context):
calls.append(trigger["trigger"]["description"])
remove = await async_attach_trigger(
hass,
{
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_TOGGLE",
"type": "button_short_press",
"subtype": "switch_1",
},
callback,
None,
)
# Remove the trigger
remove()
await hass.async_block_till_done()
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config1))
await hass.async_block_till_done()
# Verify the triggers is not active
async_fire_mqtt_message(hass, "tasmota_49A3BC/stat/SWITCH1T", '{"TRIG":"TOGGLE"}')
await hass.async_block_till_done()
assert len(calls) == 0
async def test_attach_remove_unknown1(hass, device_reg, mqtt_mock, setup_tasmota):
"""Test attach and removal of unknown trigger."""
# Discover a device without device triggers
config1 = copy.deepcopy(DEFAULT_CONFIG)
config1["swc"][0] = -1
mac = config1["mac"]
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config1))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
remove = await async_attach_trigger(
hass,
{
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_TOGGLE",
"type": "button_short_press",
"subtype": "switch_1",
},
None,
None,
)
# Remove the trigger
remove()
await hass.async_block_till_done()
async def test_attach_unknown_remove_device_from_registry(
hass, device_reg, mqtt_mock, setup_tasmota
):
"""Test attach and removal of device with unknown trigger."""
# Discover a device without device triggers
config1 = copy.deepcopy(DEFAULT_CONFIG)
config1["swc"][0] = -1
mac = config1["mac"]
# Discover a device with device triggers
config2 = copy.deepcopy(DEFAULT_CONFIG)
config2["swc"][0] = 0
# Discovery a device with device triggers to load Tasmota device trigger integration
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config2))
await hass.async_block_till_done()
# Forget the trigger
async_fire_mqtt_message(hass, f"{DEFAULT_PREFIX}/{mac}/config", json.dumps(config1))
await hass.async_block_till_done()
device_entry = device_reg.async_get_device(set(), {("mac", mac)})
await async_attach_trigger(
hass,
{
"platform": "device",
"domain": DOMAIN,
"device_id": device_entry.id,
"discovery_id": "00000049A3BC_switch_1_TOGGLE",
"type": "button_short_press",
"subtype": "switch_1",
},
None,
None,
)
# Remove the device
device_reg.async_remove_device(device_entry.id)
await hass.async_block_till_done()