Refactor mqtt callbacks for valve (#118140)

This commit is contained in:
Jan Bouwhuis 2024-05-25 23:34:56 +02:00 committed by GitHub
parent 9be829ba1f
commit 991d6d92db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -3,6 +3,7 @@
from __future__ import annotations
from contextlib import suppress
from functools import partial
import logging
from typing import Any
@ -61,12 +62,7 @@ from .const import (
DEFAULT_RETAIN,
PAYLOAD_NONE,
)
from .debug_info import log_messages
from .mixins import (
MqttEntity,
async_setup_entity_entry_helper,
write_state_on_attr_change,
)
from .mixins import MqttEntity, async_setup_entity_entry_helper
from .models import MqttCommandTemplate, MqttValueTemplate, ReceiveMessage
from .schemas import MQTT_ENTITY_COMMON_SCHEMA
from .util import valid_publish_topic, valid_subscribe_topic
@ -302,65 +298,63 @@ class MqttValve(MqttEntity, ValveEntity):
return
self._update_state(state)
@callback
def _state_message_received(self, msg: ReceiveMessage) -> None:
"""Handle new MQTT state messages."""
payload = self._value_template(msg.payload)
payload_dict: Any = None
position_payload: Any = payload
state_payload: Any = payload
if not payload:
_LOGGER.debug("Ignoring empty state message from '%s'", msg.topic)
return
with suppress(*JSON_DECODE_EXCEPTIONS):
payload_dict = json_loads(payload)
if isinstance(payload_dict, dict):
if self.reports_position and "position" not in payload_dict:
_LOGGER.warning(
"Missing required `position` attribute in json payload "
"on topic '%s', got: %s",
msg.topic,
payload,
)
return
if not self.reports_position and "state" not in payload_dict:
_LOGGER.warning(
"Missing required `state` attribute in json payload "
" on topic '%s', got: %s",
msg.topic,
payload,
)
return
position_payload = payload_dict.get("position")
state_payload = payload_dict.get("state")
if self._config[CONF_REPORTS_POSITION]:
self._process_position_valve_update(msg, position_payload, state_payload)
else:
self._process_binary_valve_update(msg, state_payload)
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics = {}
@callback
@log_messages(self.hass, self.entity_id)
@write_state_on_attr_change(
self,
{
"_attr_current_valve_position",
"_attr_is_closed",
"_attr_is_closing",
"_attr_is_opening",
},
)
def state_message_received(msg: ReceiveMessage) -> None:
"""Handle new MQTT state messages."""
payload = self._value_template(msg.payload)
payload_dict: Any = None
position_payload: Any = payload
state_payload: Any = payload
if not payload:
_LOGGER.debug("Ignoring empty state message from '%s'", msg.topic)
return
with suppress(*JSON_DECODE_EXCEPTIONS):
payload_dict = json_loads(payload)
if isinstance(payload_dict, dict):
if self.reports_position and "position" not in payload_dict:
_LOGGER.warning(
"Missing required `position` attribute in json payload "
"on topic '%s', got: %s",
msg.topic,
payload,
)
return
if not self.reports_position and "state" not in payload_dict:
_LOGGER.warning(
"Missing required `state` attribute in json payload "
" on topic '%s', got: %s",
msg.topic,
payload,
)
return
position_payload = payload_dict.get("position")
state_payload = payload_dict.get("state")
if self._config[CONF_REPORTS_POSITION]:
self._process_position_valve_update(
msg, position_payload, state_payload
)
else:
self._process_binary_valve_update(msg, state_payload)
if self._config.get(CONF_STATE_TOPIC):
topics["state_topic"] = {
"topic": self._config.get(CONF_STATE_TOPIC),
"msg_callback": state_message_received,
"msg_callback": partial(
self._message_callback,
self._state_message_received,
{
"_attr_current_valve_position",
"_attr_is_closed",
"_attr_is_closing",
"_attr_is_opening",
},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
}