Refactor mqtt callbacks for cover (#118044)

This commit is contained in:
Jan Bouwhuis 2024-05-24 22:26:24 +02:00 committed by GitHub
parent 4b89443f62
commit 35a20d9c60
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -3,6 +3,7 @@
from __future__ import annotations
from contextlib import suppress
from functools import partial
import logging
from typing import Any
@ -62,12 +63,7 @@ from .const import (
DEFAULT_POSITION_OPEN,
DEFAULT_RETAIN,
)
from .debug_info import log_messages
from .mixins import (
MqttEntity,
async_setup_entity_entry_helper,
write_state_on_attr_change,
)
from .mixins import MqttEntity, async_setup_entity_entry_helper
from .models import MqttCommandTemplate, MqttValueTemplate, ReceiveMessage
from .schemas import MQTT_ENTITY_COMMON_SCHEMA
from .util import valid_publish_topic, valid_subscribe_topic
@ -360,125 +356,119 @@ class MqttCover(MqttEntity, CoverEntity):
self._attr_is_opening = state == STATE_OPENING
self._attr_is_closing = state == STATE_CLOSING
@callback
def _tilt_message_received(self, msg: ReceiveMessage) -> None:
"""Handle tilt updates."""
payload = self._tilt_status_template(msg.payload)
if not payload:
_LOGGER.debug("Ignoring empty tilt message from '%s'", msg.topic)
return
self.tilt_payload_received(payload)
@callback
def _state_message_received(self, msg: ReceiveMessage) -> None:
"""Handle new MQTT state messages."""
payload = self._value_template(msg.payload)
if not payload:
_LOGGER.debug("Ignoring empty state message from '%s'", msg.topic)
return
state: str
if payload == self._config[CONF_STATE_STOPPED]:
if self._config.get(CONF_GET_POSITION_TOPIC) is not None:
state = (
STATE_CLOSED
if self._attr_current_cover_position == DEFAULT_POSITION_CLOSED
else STATE_OPEN
)
else:
state = (
STATE_CLOSED
if self.state in [STATE_CLOSED, STATE_CLOSING]
else STATE_OPEN
)
elif payload == self._config[CONF_STATE_OPENING]:
state = STATE_OPENING
elif payload == self._config[CONF_STATE_CLOSING]:
state = STATE_CLOSING
elif payload == self._config[CONF_STATE_OPEN]:
state = STATE_OPEN
elif payload == self._config[CONF_STATE_CLOSED]:
state = STATE_CLOSED
else:
_LOGGER.warning(
(
"Payload is not supported (e.g. open, closed, opening, closing,"
" stopped): %s"
),
payload,
)
return
self._update_state(state)
@callback
def _position_message_received(self, msg: ReceiveMessage) -> None:
"""Handle new MQTT position messages."""
payload: ReceivePayloadType = self._get_position_template(msg.payload)
payload_dict: Any = None
if not payload:
_LOGGER.debug("Ignoring empty position message from '%s'", msg.topic)
return
with suppress(*JSON_DECODE_EXCEPTIONS):
payload_dict = json_loads(payload)
if payload_dict and isinstance(payload_dict, dict):
if "position" not in payload_dict:
_LOGGER.warning(
"Template (position_template) returned JSON without position"
" attribute"
)
return
if "tilt_position" in payload_dict:
if not self._config.get(CONF_TILT_STATE_OPTIMISTIC):
# reset forced set tilt optimistic
self._tilt_optimistic = False
self.tilt_payload_received(payload_dict["tilt_position"])
payload = payload_dict["position"]
try:
percentage_payload = ranged_value_to_percentage(
self._pos_range, float(payload)
)
except ValueError:
_LOGGER.warning("Payload '%s' is not numeric", payload)
return
self._attr_current_cover_position = min(100, max(0, percentage_payload))
if self._config.get(CONF_STATE_TOPIC) is None:
self._update_state(
STATE_CLOSED if self.current_cover_position == 0 else STATE_OPEN
)
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics = {}
@callback
@log_messages(self.hass, self.entity_id)
@write_state_on_attr_change(self, {"_attr_current_cover_tilt_position"})
def tilt_message_received(msg: ReceiveMessage) -> None:
"""Handle tilt updates."""
payload = self._tilt_status_template(msg.payload)
if not payload:
_LOGGER.debug("Ignoring empty tilt message from '%s'", msg.topic)
return
self.tilt_payload_received(payload)
@callback
@log_messages(self.hass, self.entity_id)
@write_state_on_attr_change(
self, {"_attr_is_closed", "_attr_is_closing", "_attr_is_opening"}
)
def state_message_received(msg: ReceiveMessage) -> None:
"""Handle new MQTT state messages."""
payload = self._value_template(msg.payload)
if not payload:
_LOGGER.debug("Ignoring empty state message from '%s'", msg.topic)
return
state: str
if payload == self._config[CONF_STATE_STOPPED]:
if self._config.get(CONF_GET_POSITION_TOPIC) is not None:
state = (
STATE_CLOSED
if self._attr_current_cover_position == DEFAULT_POSITION_CLOSED
else STATE_OPEN
)
else:
state = (
STATE_CLOSED
if self.state in [STATE_CLOSED, STATE_CLOSING]
else STATE_OPEN
)
elif payload == self._config[CONF_STATE_OPENING]:
state = STATE_OPENING
elif payload == self._config[CONF_STATE_CLOSING]:
state = STATE_CLOSING
elif payload == self._config[CONF_STATE_OPEN]:
state = STATE_OPEN
elif payload == self._config[CONF_STATE_CLOSED]:
state = STATE_CLOSED
else:
_LOGGER.warning(
(
"Payload is not supported (e.g. open, closed, opening, closing,"
" stopped): %s"
),
payload,
)
return
self._update_state(state)
@callback
@log_messages(self.hass, self.entity_id)
@write_state_on_attr_change(
self,
{
"_attr_current_cover_position",
"_attr_current_cover_tilt_position",
"_attr_is_closed",
"_attr_is_closing",
"_attr_is_opening",
},
)
def position_message_received(msg: ReceiveMessage) -> None:
"""Handle new MQTT position messages."""
payload: ReceivePayloadType = self._get_position_template(msg.payload)
payload_dict: Any = None
if not payload:
_LOGGER.debug("Ignoring empty position message from '%s'", msg.topic)
return
with suppress(*JSON_DECODE_EXCEPTIONS):
payload_dict = json_loads(payload)
if payload_dict and isinstance(payload_dict, dict):
if "position" not in payload_dict:
_LOGGER.warning(
"Template (position_template) returned JSON without position"
" attribute"
)
return
if "tilt_position" in payload_dict:
if not self._config.get(CONF_TILT_STATE_OPTIMISTIC):
# reset forced set tilt optimistic
self._tilt_optimistic = False
self.tilt_payload_received(payload_dict["tilt_position"])
payload = payload_dict["position"]
try:
percentage_payload = ranged_value_to_percentage(
self._pos_range, float(payload)
)
except ValueError:
_LOGGER.warning("Payload '%s' is not numeric", payload)
return
self._attr_current_cover_position = min(100, max(0, percentage_payload))
if self._config.get(CONF_STATE_TOPIC) is None:
self._update_state(
STATE_CLOSED if self.current_cover_position == 0 else STATE_OPEN
)
if self._config.get(CONF_GET_POSITION_TOPIC):
topics["get_position_topic"] = {
"topic": self._config.get(CONF_GET_POSITION_TOPIC),
"msg_callback": position_message_received,
"msg_callback": partial(
self._message_callback,
self._position_message_received,
{
"_attr_current_cover_position",
"_attr_current_cover_tilt_position",
"_attr_is_closed",
"_attr_is_closing",
"_attr_is_opening",
},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
}
@ -486,7 +476,12 @@ class MqttCover(MqttEntity, CoverEntity):
if self._config.get(CONF_STATE_TOPIC):
topics["state_topic"] = {
"topic": self._config.get(CONF_STATE_TOPIC),
"msg_callback": state_message_received,
"msg_callback": partial(
self._message_callback,
self._state_message_received,
{"_attr_is_closed", "_attr_is_closing", "_attr_is_opening"},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
}
@ -494,7 +489,12 @@ class MqttCover(MqttEntity, CoverEntity):
if self._config.get(CONF_TILT_STATUS_TOPIC) is not None:
topics["tilt_status_topic"] = {
"topic": self._config.get(CONF_TILT_STATUS_TOPIC),
"msg_callback": tilt_message_received,
"msg_callback": partial(
self._message_callback,
self._tilt_message_received,
{"_attr_current_cover_tilt_position"},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
}