diff --git a/homeassistant/components/mqtt/trigger.py b/homeassistant/components/mqtt/trigger.py index 3530538122d..c10e539f8ec 100644 --- a/homeassistant/components/mqtt/trigger.py +++ b/homeassistant/components/mqtt/trigger.py @@ -1,21 +1,31 @@ """Offer MQTT listening automation rules.""" +from __future__ import annotations + +from collections.abc import Callable from contextlib import suppress import logging +from typing import Any import voluptuous as vol from homeassistant.const import CONF_PAYLOAD, CONF_PLATFORM, CONF_VALUE_TEMPLATE from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback -from homeassistant.helpers import config_validation as cv, template +from homeassistant.helpers import config_validation as cv from homeassistant.helpers.json import json_loads -from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo -from homeassistant.helpers.typing import ConfigType +from homeassistant.helpers.template import Template +from homeassistant.helpers.trigger import TriggerActionType, TriggerData, TriggerInfo +from homeassistant.helpers.typing import ConfigType, TemplateVarsType from .. import mqtt from .const import CONF_ENCODING, CONF_QOS, CONF_TOPIC, DEFAULT_ENCODING, DEFAULT_QOS - -# mypy: allow-untyped-defs - +from .models import ( + MqttCommandTemplate, + MqttValueTemplate, + PayloadSentinel, + PublishPayloadType, + ReceiveMessage, + ReceivePayloadType, +) TRIGGER_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend( { @@ -40,43 +50,37 @@ async def async_attach_trigger( trigger_info: TriggerInfo, ) -> CALLBACK_TYPE: """Listen for state changes based on configuration.""" - trigger_data = trigger_info["trigger_data"] - topic = config[CONF_TOPIC] - wanted_payload = config.get(CONF_PAYLOAD) - value_template = config.get(CONF_VALUE_TEMPLATE) - encoding = config[CONF_ENCODING] or None - qos = config[CONF_QOS] + trigger_data: TriggerData = trigger_info["trigger_data"] + command_template: Callable[ + [PublishPayloadType, TemplateVarsType], PublishPayloadType + ] = MqttCommandTemplate(config.get(CONF_PAYLOAD), hass=hass).async_render + value_template: Callable[[ReceivePayloadType, str], ReceivePayloadType] + value_template = MqttValueTemplate( + config.get(CONF_VALUE_TEMPLATE), hass=hass + ).async_render_with_possible_json_value + encoding: str | None = config[CONF_ENCODING] or None + qos: int = config[CONF_QOS] job = HassJob(action) - variables = None + variables: TemplateVarsType | None = None if trigger_info: variables = trigger_info.get("variables") - template.attach(hass, wanted_payload) - if wanted_payload: - wanted_payload = wanted_payload.async_render( - variables, limited=True, parse_result=False - ) + wanted_payload = command_template(None, variables) - template.attach(hass, topic) - if isinstance(topic, template.Template): - topic = topic.async_render(variables, limited=True, parse_result=False) - topic = mqtt.util.valid_subscribe_topic(topic) - - template.attach(hass, value_template) + topic_template: Template = config[CONF_TOPIC] + topic_template.hass = hass + topic = topic_template.async_render(variables, limited=True, parse_result=False) + mqtt.util.valid_subscribe_topic(topic) @callback - def mqtt_automation_listener(mqttmsg): + def mqtt_automation_listener(mqttmsg: ReceiveMessage) -> None: """Listen for MQTT messages.""" - payload = mqttmsg.payload - - if value_template is not None: - payload = value_template.async_render_with_possible_json_value( - payload, - error_value=None, - ) - - if wanted_payload is None or wanted_payload == payload: - data = { + if wanted_payload is None or ( + (payload := value_template(mqttmsg.payload, PayloadSentinel.DEFAULT)) + and payload is not PayloadSentinel.DEFAULT + and wanted_payload == payload + ): + data: dict[str, Any] = { **trigger_data, "platform": "mqtt", "topic": mqttmsg.topic,