Simplify subscription mqtt entity platforms (#118177)

This commit is contained in:
Jan Bouwhuis 2024-05-26 21:25:54 +02:00 committed by GitHub
parent 008b56b4dd
commit b7f1f805fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 241 additions and 749 deletions

View File

@ -2,7 +2,6 @@
from __future__ import annotations
from functools import partial
import logging
import voluptuous as vol
@ -25,7 +24,7 @@ from homeassistant.const import (
STATE_ALARM_PENDING,
STATE_ALARM_TRIGGERED,
)
from homeassistant.core import HassJobType, HomeAssistant
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType
@ -35,8 +34,6 @@ from .config import DEFAULT_RETAIN, MQTT_BASE_SCHEMA
from .const import (
CONF_COMMAND_TEMPLATE,
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_QOS,
CONF_RETAIN,
CONF_STATE_TOPIC,
CONF_SUPPORTED_FEATURES,
@ -203,26 +200,11 @@ class MqttAlarm(MqttEntity, alarm.AlarmControlPanelEntity):
return
self._attr_state = str(payload)
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
{
"state_topic": {
"topic": self._config[CONF_STATE_TOPIC],
"msg_callback": partial(
self._message_callback,
self._state_message_received,
{"_attr_state"},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
},
self.add_subscription(
CONF_STATE_TOPIC, self._state_message_received, {"_attr_state"}
)
async def _subscribe_topics(self) -> None:

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from datetime import datetime, timedelta
from functools import partial
import logging
from typing import Any
@ -26,7 +25,7 @@ from homeassistant.const import (
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import CALLBACK_TYPE, HassJobType, HomeAssistant, callback
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
import homeassistant.helpers.event as evt
@ -37,7 +36,7 @@ from homeassistant.util import dt as dt_util
from . import subscription
from .config import MQTT_RO_SCHEMA
from .const import CONF_ENCODING, CONF_QOS, CONF_STATE_TOPIC, PAYLOAD_NONE
from .const import CONF_STATE_TOPIC, PAYLOAD_NONE
from .mixins import MqttAvailabilityMixin, MqttEntity, async_setup_entity_entry_helper
from .models import MqttValueTemplate, ReceiveMessage
from .schemas import MQTT_ENTITY_COMMON_SCHEMA
@ -231,26 +230,11 @@ class MqttBinarySensor(MqttEntity, BinarySensorEntity, RestoreEntity):
self.hass, off_delay, self._off_delay_listener
)
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
{
"state_topic": {
"topic": self._config[CONF_STATE_TOPIC],
"msg_callback": partial(
self._message_callback,
self._state_message_received,
{"_attr_is_on", "_expired"},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
},
self.add_subscription(
CONF_STATE_TOPIC, self._state_message_received, {"_attr_is_on", "_expired"}
)
async def _subscribe_topics(self) -> None:

View File

@ -8,7 +8,7 @@ from homeassistant.components import button
from homeassistant.components.button import DEVICE_CLASSES_SCHEMA, ButtonEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_DEVICE_CLASS, CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType
@ -73,6 +73,7 @@ class MqttButton(MqttEntity, ButtonEntity):
).async_render
self._attr_device_class = self._config.get(CONF_DEVICE_CLASS)
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from base64 import b64decode
from functools import partial
import logging
from typing import TYPE_CHECKING
@ -13,14 +12,14 @@ from homeassistant.components import camera
from homeassistant.components.camera import Camera
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_NAME
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import subscription
from .config import MQTT_BASE_SCHEMA
from .const import CONF_QOS, CONF_TOPIC
from .const import CONF_TOPIC
from .mixins import MqttEntity, async_setup_entity_entry_helper
from .models import ReceiveMessage
from .schemas import MQTT_ENTITY_COMMON_SCHEMA
@ -107,26 +106,12 @@ class MqttCamera(MqttEntity, Camera):
assert isinstance(msg.payload, bytes)
self._last_image = msg.payload
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
{
"state_topic": {
"topic": self._config[CONF_TOPIC],
"msg_callback": partial(
self._message_callback,
self._image_received,
None,
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": None,
"job_type": HassJobType.Callback,
}
},
self.add_subscription(
CONF_TOPIC, self._image_received, None, disable_encoding=True
)
async def _subscribe_topics(self) -> None:

View File

@ -43,7 +43,7 @@ from homeassistant.const import (
PRECISION_WHOLE,
UnitOfTemperature,
)
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.template import Template
@ -59,7 +59,6 @@ from .const import (
CONF_CURRENT_HUMIDITY_TOPIC,
CONF_CURRENT_TEMP_TEMPLATE,
CONF_CURRENT_TEMP_TOPIC,
CONF_ENCODING,
CONF_MODE_COMMAND_TEMPLATE,
CONF_MODE_COMMAND_TOPIC,
CONF_MODE_LIST,
@ -68,7 +67,6 @@ from .const import (
CONF_POWER_COMMAND_TEMPLATE,
CONF_POWER_COMMAND_TOPIC,
CONF_PRECISION,
CONF_QOS,
CONF_RETAIN,
CONF_TEMP_COMMAND_TEMPLATE,
CONF_TEMP_COMMAND_TOPIC,
@ -409,29 +407,6 @@ class MqttTemperatureControlEntity(MqttEntity, ABC):
_command_templates: dict[str, Callable[[PublishPayloadType], PublishPayloadType]]
_value_templates: dict[str, Callable[[ReceivePayloadType], ReceivePayloadType]]
def add_subscription(
self,
topics: dict[str, dict[str, Any]],
topic: str,
msg_callback: Callable[[ReceiveMessage], None],
tracked_attributes: set[str],
) -> None:
"""Add a subscription."""
qos: int = self._config[CONF_QOS]
if topic in self._topic and self._topic[topic] is not None:
topics[topic] = {
"topic": self._topic[topic],
"msg_callback": partial(
self._message_callback,
msg_callback,
tracked_attributes,
),
"entity_id": self.entity_id,
"qos": qos,
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
def render_template(
self, msg: ReceiveMessage, template_name: str
) -> ReceivePayloadType:
@ -462,11 +437,9 @@ class MqttTemperatureControlEntity(MqttEntity, ABC):
@callback
def prepare_subscribe_topics(
self,
topics: dict[str, dict[str, Any]],
) -> None:
"""(Re)Subscribe to topics."""
self.add_subscription(
topics,
CONF_CURRENT_TEMP_TOPIC,
partial(
self.handle_climate_attribute_received,
@ -476,7 +449,6 @@ class MqttTemperatureControlEntity(MqttEntity, ABC):
{"_attr_current_temperature"},
)
self.add_subscription(
topics,
CONF_TEMP_STATE_TOPIC,
partial(
self.handle_climate_attribute_received,
@ -486,7 +458,6 @@ class MqttTemperatureControlEntity(MqttEntity, ABC):
{"_attr_target_temperature"},
)
self.add_subscription(
topics,
CONF_TEMP_LOW_STATE_TOPIC,
partial(
self.handle_climate_attribute_received,
@ -496,7 +467,6 @@ class MqttTemperatureControlEntity(MqttEntity, ABC):
{"_attr_target_temperature_low"},
)
self.add_subscription(
topics,
CONF_TEMP_HIGH_STATE_TOPIC,
partial(
self.handle_climate_attribute_received,
@ -506,10 +476,6 @@ class MqttTemperatureControlEntity(MqttEntity, ABC):
{"_attr_target_temperature_high"},
)
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
)
async def _subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
subscription.async_subscribe_topics_internal(self.hass, self._sub_state)
@ -761,16 +727,13 @@ class MqttClimate(MqttTemperatureControlEntity, ClimateEntity):
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics: dict[str, dict[str, Any]] = {}
# add subscriptions for MqttClimate
self.add_subscription(
topics,
CONF_ACTION_TOPIC,
self._handle_action_received,
{"_attr_hvac_action"},
)
self.add_subscription(
topics,
CONF_CURRENT_HUMIDITY_TOPIC,
partial(
self.handle_climate_attribute_received,
@ -780,7 +743,6 @@ class MqttClimate(MqttTemperatureControlEntity, ClimateEntity):
{"_attr_current_humidity"},
)
self.add_subscription(
topics,
CONF_HUMIDITY_STATE_TOPIC,
partial(
self.handle_climate_attribute_received,
@ -790,7 +752,6 @@ class MqttClimate(MqttTemperatureControlEntity, ClimateEntity):
{"_attr_target_humidity"},
)
self.add_subscription(
topics,
CONF_MODE_STATE_TOPIC,
partial(
self._handle_mode_received,
@ -801,7 +762,6 @@ class MqttClimate(MqttTemperatureControlEntity, ClimateEntity):
{"_attr_hvac_mode"},
)
self.add_subscription(
topics,
CONF_FAN_MODE_STATE_TOPIC,
partial(
self._handle_mode_received,
@ -812,7 +772,6 @@ class MqttClimate(MqttTemperatureControlEntity, ClimateEntity):
{"_attr_fan_mode"},
)
self.add_subscription(
topics,
CONF_SWING_MODE_STATE_TOPIC,
partial(
self._handle_mode_received,
@ -823,13 +782,12 @@ class MqttClimate(MqttTemperatureControlEntity, ClimateEntity):
{"_attr_swing_mode"},
)
self.add_subscription(
topics,
CONF_PRESET_MODE_STATE_TOPIC,
self._handle_preset_mode_received,
{"_attr_preset_mode"},
)
self.prepare_subscribe_topics(topics)
# add subscriptions for MqttTemperatureControlEntity
self.prepare_subscribe_topics()
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set new target temperatures."""

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from contextlib import suppress
from functools import partial
import logging
from typing import Any
@ -28,7 +27,7 @@ from homeassistant.const import (
STATE_OPEN,
STATE_OPENING,
)
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.service_info.mqtt import ReceivePayloadType
@ -43,13 +42,11 @@ from . import subscription
from .config import MQTT_BASE_SCHEMA
from .const import (
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_PAYLOAD_CLOSE,
CONF_PAYLOAD_OPEN,
CONF_PAYLOAD_STOP,
CONF_POSITION_CLOSED,
CONF_POSITION_OPEN,
CONF_QOS,
CONF_RETAIN,
CONF_STATE_CLOSED,
CONF_STATE_CLOSING,
@ -457,60 +454,29 @@ class MqttCover(MqttEntity, CoverEntity):
STATE_CLOSED if self.current_cover_position == 0 else STATE_OPEN
)
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics = {}
if self._config.get(CONF_GET_POSITION_TOPIC):
topics["get_position_topic"] = {
"topic": self._config.get(CONF_GET_POSITION_TOPIC),
"msg_callback": partial(
self._message_callback,
self._position_message_received,
{
"_attr_current_cover_position",
"_attr_current_cover_tilt_position",
"_attr_is_closed",
"_attr_is_closing",
"_attr_is_opening",
},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
if self._config.get(CONF_STATE_TOPIC):
topics["state_topic"] = {
"topic": self._config.get(CONF_STATE_TOPIC),
"msg_callback": partial(
self._message_callback,
self._state_message_received,
{"_attr_is_closed", "_attr_is_closing", "_attr_is_opening"},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
if self._config.get(CONF_TILT_STATUS_TOPIC) is not None:
topics["tilt_status_topic"] = {
"topic": self._config.get(CONF_TILT_STATUS_TOPIC),
"msg_callback": partial(
self._message_callback,
self._tilt_message_received,
{"_attr_current_cover_tilt_position"},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
self.add_subscription(
CONF_GET_POSITION_TOPIC,
self._position_message_received,
{
"_attr_current_cover_position",
"_attr_current_cover_tilt_position",
"_attr_is_closed",
"_attr_is_closing",
"_attr_is_opening",
},
)
self.add_subscription(
CONF_STATE_TOPIC,
self._state_message_received,
{"_attr_is_closed", "_attr_is_closing", "_attr_is_opening"},
)
self.add_subscription(
CONF_TILT_STATUS_TOPIC,
self._tilt_message_received,
{"_attr_current_cover_tilt_position"},
)
async def _subscribe_topics(self) -> None:

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from collections.abc import Callable
from functools import partial
import logging
from typing import TYPE_CHECKING
@ -25,14 +24,14 @@ from homeassistant.const import (
STATE_HOME,
STATE_NOT_HOME,
)
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType
from . import subscription
from .config import MQTT_BASE_SCHEMA
from .const import CONF_PAYLOAD_RESET, CONF_QOS, CONF_STATE_TOPIC
from .const import CONF_PAYLOAD_RESET, CONF_STATE_TOPIC
from .mixins import CONF_JSON_ATTRS_TOPIC, MqttEntity, async_setup_entity_entry_helper
from .models import MqttValueTemplate, ReceiveMessage, ReceivePayloadType
from .schemas import MQTT_ENTITY_COMMON_SCHEMA
@ -136,28 +135,11 @@ class MqttDeviceTracker(MqttEntity, TrackerEntity):
assert isinstance(msg.payload, str)
self._location_name = msg.payload
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
state_topic: str | None = self._config.get(CONF_STATE_TOPIC)
if state_topic is None:
return
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
{
"state_topic": {
"topic": state_topic,
"msg_callback": partial(
self._message_callback,
self._tracker_message_received,
{"_location_name"},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"job_type": HassJobType.Callback,
}
},
self.add_subscription(
CONF_STATE_TOPIC, self._tracker_message_received, {"_location_name"}
)
@property

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from collections.abc import Callable
from functools import partial
import logging
from typing import Any
@ -17,7 +16,7 @@ from homeassistant.components.event import (
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_DEVICE_CLASS, CONF_NAME, CONF_VALUE_TEMPLATE
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType
@ -25,13 +24,7 @@ from homeassistant.util.json import JSON_DECODE_EXCEPTIONS, json_loads_object
from . import subscription
from .config import MQTT_RO_SCHEMA
from .const import (
CONF_ENCODING,
CONF_QOS,
CONF_STATE_TOPIC,
PAYLOAD_EMPTY_JSON,
PAYLOAD_NONE,
)
from .const import CONF_STATE_TOPIC, PAYLOAD_EMPTY_JSON, PAYLOAD_NONE
from .mixins import MqttEntity, async_setup_entity_entry_helper
from .models import (
DATA_MQTT,
@ -186,26 +179,10 @@ class MqttEvent(MqttEntity, EventEntity):
mqtt_data = self.hass.data[DATA_MQTT]
mqtt_data.state_write_requests.write_state_request(self)
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics: dict[str, dict[str, Any]] = {}
topics["state_topic"] = {
"topic": self._config[CONF_STATE_TOPIC],
"msg_callback": partial(
self._message_callback,
self._event_received,
None,
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
)
self.add_subscription(CONF_STATE_TOPIC, self._event_received, None)
async def _subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from collections.abc import Callable
from functools import partial
import logging
import math
from typing import Any
@ -27,7 +26,7 @@ from homeassistant.const import (
CONF_PAYLOAD_ON,
CONF_STATE,
)
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.template import Template
@ -43,15 +42,12 @@ from .config import MQTT_RW_SCHEMA
from .const import (
CONF_COMMAND_TEMPLATE,
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_QOS,
CONF_STATE_TOPIC,
CONF_STATE_VALUE_TEMPLATE,
PAYLOAD_NONE,
)
from .mixins import MqttEntity, async_setup_entity_entry_helper
from .models import (
MessageCallbackType,
MqttCommandTemplate,
MqttValueTemplate,
PublishPayloadType,
@ -429,52 +425,30 @@ class MqttFan(MqttEntity, FanEntity):
return
self._attr_current_direction = str(direction)
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics: dict[str, Any] = {}
def add_subscribe_topic(
topic: str, msg_callback: MessageCallbackType, tracked_attributes: set[str]
) -> bool:
"""Add a topic to subscribe to."""
if has_topic := self._topic[topic] is not None:
topics[topic] = {
"topic": self._topic[topic],
"msg_callback": partial(
self._message_callback, msg_callback, tracked_attributes
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
return has_topic
add_subscribe_topic(CONF_STATE_TOPIC, self._state_received, {"_attr_is_on"})
add_subscribe_topic(
self.add_subscription(CONF_STATE_TOPIC, self._state_received, {"_attr_is_on"})
self.add_subscription(
CONF_PERCENTAGE_STATE_TOPIC, self._percentage_received, {"_attr_percentage"}
)
add_subscribe_topic(
self.add_subscription(
CONF_PRESET_MODE_STATE_TOPIC,
self._preset_mode_received,
{"_attr_preset_mode"},
)
if add_subscribe_topic(
if self.add_subscription(
CONF_OSCILLATION_STATE_TOPIC,
self._oscillation_received,
{"_attr_oscillating"},
):
self._attr_oscillating = False
add_subscribe_topic(
self.add_subscription(
CONF_DIRECTION_STATE_TOPIC,
self._direction_received,
{"_attr_current_direction"},
)
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
)
async def _subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
subscription.async_subscribe_topics_internal(self.hass, self._sub_state)

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from collections.abc import Callable
from functools import partial
import logging
from typing import Any
@ -30,7 +29,7 @@ from homeassistant.const import (
CONF_PAYLOAD_ON,
CONF_STATE,
)
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.template import Template
@ -45,8 +44,6 @@ from .const import (
CONF_COMMAND_TOPIC,
CONF_CURRENT_HUMIDITY_TEMPLATE,
CONF_CURRENT_HUMIDITY_TOPIC,
CONF_ENCODING,
CONF_QOS,
CONF_STATE_TOPIC,
CONF_STATE_VALUE_TEMPLATE,
PAYLOAD_NONE,
@ -274,27 +271,6 @@ class MqttHumidifier(MqttEntity, HumidifierEntity):
for key, tpl in value_templates.items()
}
def add_subscription(
self,
topics: dict[str, dict[str, Any]],
topic: str,
msg_callback: Callable[[ReceiveMessage], None],
tracked_attributes: set[str],
) -> None:
"""Add a subscription."""
qos: int = self._config[CONF_QOS]
if topic in self._topic and self._topic[topic] is not None:
topics[topic] = {
"topic": self._topic[topic],
"msg_callback": partial(
self._message_callback, msg_callback, tracked_attributes
),
"entity_id": self.entity_id,
"qos": qos,
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
@callback
def _state_received(self, msg: ReceiveMessage) -> None:
"""Handle new received MQTT message."""
@ -415,34 +391,25 @@ class MqttHumidifier(MqttEntity, HumidifierEntity):
self._attr_mode = mode
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics: dict[str, Any] = {}
self.add_subscription(CONF_STATE_TOPIC, self._state_received, {"_attr_is_on"})
self.add_subscription(
topics, CONF_STATE_TOPIC, self._state_received, {"_attr_is_on"}
CONF_ACTION_TOPIC, self._action_received, {"_attr_action"}
)
self.add_subscription(
topics, CONF_ACTION_TOPIC, self._action_received, {"_attr_action"}
)
self.add_subscription(
topics,
CONF_CURRENT_HUMIDITY_TOPIC,
self._current_humidity_received,
{"_attr_current_humidity"},
)
self.add_subscription(
topics,
CONF_TARGET_HUMIDITY_STATE_TOPIC,
self._target_humidity_received,
{"_attr_target_humidity"},
)
self.add_subscription(
topics, CONF_MODE_STATE_TOPIC, self._mode_received, {"_attr_mode"}
)
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
CONF_MODE_STATE_TOPIC, self._mode_received, {"_attr_mode"}
)
async def _subscribe_topics(self) -> None:

View File

@ -5,7 +5,6 @@ from __future__ import annotations
from base64 import b64decode
import binascii
from collections.abc import Callable
from functools import partial
import logging
from typing import TYPE_CHECKING, Any
@ -16,7 +15,7 @@ from homeassistant.components import image
from homeassistant.components.image import DEFAULT_CONTENT_TYPE, ImageEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_NAME
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.httpx_client import get_async_client
@ -26,11 +25,9 @@ from homeassistant.util import dt as dt_util
from . import subscription
from .config import MQTT_BASE_SCHEMA
from .const import CONF_ENCODING, CONF_QOS
from .mixins import MqttEntity, async_setup_entity_entry_helper
from .models import (
DATA_MQTT,
MessageCallbackType,
MqttValueTemplate,
MqttValueTemplateException,
ReceiveMessage,
@ -182,35 +179,14 @@ class MqttImage(MqttEntity, ImageEntity):
self._cached_image = None
self.hass.data[DATA_MQTT].state_write_requests.write_state_request(self)
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics: dict[str, Any] = {}
def add_subscribe_topic(topic: str, msg_callback: MessageCallbackType) -> bool:
"""Add a topic to subscribe to."""
encoding: str | None
encoding = (
None
if CONF_IMAGE_TOPIC in self._config
else self._config[CONF_ENCODING] or None
)
if has_topic := self._topic[topic] is not None:
topics[topic] = {
"topic": self._topic[topic],
"msg_callback": partial(self._message_callback, msg_callback, None),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": encoding,
"job_type": HassJobType.Callback,
}
return has_topic
add_subscribe_topic(CONF_IMAGE_TOPIC, self._image_data_received)
add_subscribe_topic(CONF_URL_TOPIC, self._image_from_url_request_received)
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
self.add_subscription(
CONF_IMAGE_TOPIC, self._image_data_received, None, disable_encoding=True
)
self.add_subscription(
CONF_URL_TOPIC, self._image_from_url_request_received, None
)
async def _subscribe_topics(self) -> None:

View File

@ -4,7 +4,6 @@ from __future__ import annotations
from collections.abc import Callable
import contextlib
from functools import partial
import logging
import voluptuous as vol
@ -17,7 +16,7 @@ from homeassistant.components.lawn_mower import (
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_NAME, CONF_OPTIMISTIC
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.restore_state import RestoreEntity
@ -25,13 +24,7 @@ from homeassistant.helpers.typing import ConfigType
from . import subscription
from .config import MQTT_BASE_SCHEMA
from .const import (
CONF_ENCODING,
CONF_QOS,
CONF_RETAIN,
DEFAULT_OPTIMISTIC,
DEFAULT_RETAIN,
)
from .const import CONF_RETAIN, DEFAULT_OPTIMISTIC, DEFAULT_RETAIN
from .mixins import MqttEntity, async_setup_entity_entry_helper
from .models import (
MqttCommandTemplate,
@ -172,30 +165,15 @@ class MqttLawnMower(MqttEntity, LawnMowerEntity, RestoreEntity):
)
return
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
if self._config.get(CONF_ACTIVITY_STATE_TOPIC) is None:
if not self.add_subscription(
CONF_ACTIVITY_STATE_TOPIC, self._message_received, {"_attr_activity"}
):
# Force into optimistic mode.
self._attr_assumed_state = True
return
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
{
CONF_ACTIVITY_STATE_TOPIC: {
"topic": self._config.get(CONF_ACTIVITY_STATE_TOPIC),
"msg_callback": partial(
self._message_callback,
self._message_received,
{"_attr_activity"},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
},
)
async def _subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from collections.abc import Callable
from functools import partial
import logging
from typing import Any, cast
@ -37,7 +36,7 @@ from homeassistant.const import (
CONF_PAYLOAD_ON,
STATE_ON,
)
from homeassistant.core import HassJobType, callback
from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.typing import ConfigType
@ -47,15 +46,12 @@ from .. import subscription
from ..config import MQTT_RW_SCHEMA
from ..const import (
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_QOS,
CONF_STATE_TOPIC,
CONF_STATE_VALUE_TEMPLATE,
PAYLOAD_NONE,
)
from ..mixins import MqttEntity
from ..models import (
MessageCallbackType,
MqttCommandTemplate,
MqttValueTemplate,
PayloadSentinel,
@ -562,69 +558,50 @@ class MqttLight(MqttEntity, LightEntity, RestoreEntity):
self._attr_color_mode = ColorMode.XY
self._attr_xy_color = cast(tuple[float, float], xy_color)
@callback
def _prepare_subscribe_topics(self) -> None: # noqa: C901
"""(Re)Subscribe to topics."""
topics: dict[str, dict[str, Any]] = {}
def add_topic(
topic: str, msg_callback: MessageCallbackType, tracked_attributes: set[str]
) -> None:
"""Add a topic."""
if self._topic[topic] is not None:
topics[topic] = {
"topic": self._topic[topic],
"msg_callback": partial(
self._message_callback, msg_callback, tracked_attributes
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
add_topic(CONF_STATE_TOPIC, self._state_received, {"_attr_is_on"})
add_topic(
self.add_subscription(CONF_STATE_TOPIC, self._state_received, {"_attr_is_on"})
self.add_subscription(
CONF_BRIGHTNESS_STATE_TOPIC, self._brightness_received, {"_attr_brightness"}
)
add_topic(
self.add_subscription(
CONF_RGB_STATE_TOPIC,
self._rgb_received,
{"_attr_brightness", "_attr_color_mode", "_attr_rgb_color"},
)
add_topic(
self.add_subscription(
CONF_RGBW_STATE_TOPIC,
self._rgbw_received,
{"_attr_brightness", "_attr_color_mode", "_attr_rgbw_color"},
)
add_topic(
self.add_subscription(
CONF_RGBWW_STATE_TOPIC,
self._rgbww_received,
{"_attr_brightness", "_attr_color_mode", "_attr_rgbww_color"},
)
add_topic(
self.add_subscription(
CONF_COLOR_MODE_STATE_TOPIC, self._color_mode_received, {"_attr_color_mode"}
)
add_topic(
self.add_subscription(
CONF_COLOR_TEMP_STATE_TOPIC,
self._color_temp_received,
{"_attr_color_mode", "_attr_color_temp"},
)
add_topic(CONF_EFFECT_STATE_TOPIC, self._effect_received, {"_attr_effect"})
add_topic(
self.add_subscription(
CONF_EFFECT_STATE_TOPIC, self._effect_received, {"_attr_effect"}
)
self.add_subscription(
CONF_HS_STATE_TOPIC,
self._hs_received,
{"_attr_color_mode", "_attr_hs_color"},
)
add_topic(
self.add_subscription(
CONF_XY_STATE_TOPIC,
self._xy_received,
{"_attr_color_mode", "_attr_xy_color"},
)
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
)
async def _subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
subscription.async_subscribe_topics_internal(self.hass, self._sub_state)

View File

@ -4,7 +4,6 @@ from __future__ import annotations
from collections.abc import Callable
from contextlib import suppress
from functools import partial
import logging
from typing import TYPE_CHECKING, Any, cast
@ -47,7 +46,7 @@ from homeassistant.const import (
CONF_XY,
STATE_ON,
)
from homeassistant.core import HassJobType, async_get_hass, callback
from homeassistant.core import async_get_hass, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from homeassistant.helpers.json import json_dumps
@ -61,7 +60,6 @@ from .. import subscription
from ..config import DEFAULT_QOS, DEFAULT_RETAIN, MQTT_RW_SCHEMA
from ..const import (
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_QOS,
CONF_RETAIN,
CONF_STATE_TOPIC,
@ -490,40 +488,23 @@ class MqttLightJson(MqttEntity, LightEntity, RestoreEntity):
with suppress(KeyError):
self._attr_effect = cast(str, values["effect"])
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
#
if self._topic[CONF_STATE_TOPIC] is None:
return
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
self.add_subscription(
CONF_STATE_TOPIC,
self._state_received,
{
CONF_STATE_TOPIC: {
"topic": self._topic[CONF_STATE_TOPIC],
"msg_callback": partial(
self._message_callback,
self._state_received,
{
"_attr_brightness",
"_attr_color_temp",
"_attr_effect",
"_attr_hs_color",
"_attr_is_on",
"_attr_rgb_color",
"_attr_rgbw_color",
"_attr_rgbww_color",
"_attr_xy_color",
"color_mode",
},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
"_attr_brightness",
"_attr_color_temp",
"_attr_effect",
"_attr_hs_color",
"_attr_is_on",
"_attr_rgb_color",
"_attr_rgbw_color",
"_attr_rgbww_color",
"_attr_xy_color",
"color_mode",
},
)

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from collections.abc import Callable
from functools import partial
import logging
from typing import Any
@ -29,7 +28,7 @@ from homeassistant.const import (
STATE_OFF,
STATE_ON,
)
from homeassistant.core import HassJobType, callback
from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.typing import ConfigType, TemplateVarsType
@ -37,13 +36,7 @@ import homeassistant.util.color as color_util
from .. import subscription
from ..config import MQTT_RW_SCHEMA
from ..const import (
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_QOS,
CONF_STATE_TOPIC,
PAYLOAD_NONE,
)
from ..const import CONF_COMMAND_TOPIC, CONF_STATE_TOPIC, PAYLOAD_NONE
from ..mixins import MqttEntity
from ..models import (
MqttCommandTemplate,
@ -254,35 +247,19 @@ class MqttLightTemplate(MqttEntity, LightEntity, RestoreEntity):
else:
_LOGGER.warning("Unsupported effect value received")
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
if self._topics[CONF_STATE_TOPIC] is None:
return
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
self.add_subscription(
CONF_STATE_TOPIC,
self._state_received,
{
"state_topic": {
"topic": self._topics[CONF_STATE_TOPIC],
"msg_callback": partial(
self._message_callback,
self._state_received,
{
"_attr_brightness",
"_attr_color_mode",
"_attr_color_temp",
"_attr_effect",
"_attr_hs_color",
"_attr_is_on",
},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
"_attr_brightness",
"_attr_color_mode",
"_attr_color_temp",
"_attr_effect",
"_attr_hs_color",
"_attr_is_on",
},
)

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from collections.abc import Callable
from functools import partial
import logging
import re
from typing import Any
@ -19,7 +18,7 @@ from homeassistant.const import (
CONF_OPTIMISTIC,
CONF_VALUE_TEMPLATE,
)
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, TemplateVarsType
@ -29,9 +28,7 @@ from .config import MQTT_RW_SCHEMA
from .const import (
CONF_COMMAND_TEMPLATE,
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_PAYLOAD_RESET,
CONF_QOS,
CONF_STATE_OPEN,
CONF_STATE_OPENING,
CONF_STATE_TOPIC,
@ -203,42 +200,20 @@ class MqttLock(MqttEntity, LockEntity):
self._attr_is_unlocking = payload == self._config[CONF_STATE_UNLOCKING]
self._attr_is_jammed = payload == self._config[CONF_STATE_JAMMED]
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics: dict[str, dict[str, Any]]
qos: int = self._config[CONF_QOS]
encoding: str | None = self._config[CONF_ENCODING] or None
if self._config.get(CONF_STATE_TOPIC) is None:
# Force into optimistic mode.
self._optimistic = True
return
topics = {
CONF_STATE_TOPIC: {
"topic": self._config.get(CONF_STATE_TOPIC),
"msg_callback": partial(
self._message_callback,
self._message_received,
{
"_attr_is_jammed",
"_attr_is_locked",
"_attr_is_locking",
"_attr_is_open",
"_attr_is_opening",
"_attr_is_unlocking",
},
),
"entity_id": self.entity_id,
CONF_QOS: qos,
CONF_ENCODING: encoding,
"job_type": HassJobType.Callback,
}
}
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
topics,
self.add_subscription(
CONF_STATE_TOPIC,
self._message_received,
{
"_attr_is_jammed",
"_attr_is_locked",
"_attr_is_locking",
"_attr_is_open",
"_attr_is_opening",
"_attr_is_unlocking",
},
)
async def _subscribe_topics(self) -> None:

View File

@ -1071,6 +1071,7 @@ class MqttEntity(
self._attr_unique_id = config.get(CONF_UNIQUE_ID)
self._sub_state: dict[str, EntitySubscription] = {}
self._discovery = discovery_data is not None
self._subscriptions: dict[str, dict[str, Any]]
# Load config
self._setup_from_config(self._config)
@ -1097,7 +1098,14 @@ class MqttEntity(
async def async_added_to_hass(self) -> None:
"""Subscribe to MQTT events."""
await super().async_added_to_hass()
self._subscriptions = {}
self._prepare_subscribe_topics()
if self._subscriptions:
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
self._subscriptions,
)
await self._subscribe_topics()
await self.mqtt_async_added_to_hass()
@ -1122,7 +1130,14 @@ class MqttEntity(
self.attributes_prepare_discovery_update(config)
self.availability_prepare_discovery_update(config)
self.device_info_discovery_update(config)
self._subscriptions = {}
self._prepare_subscribe_topics()
if self._subscriptions:
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
self._subscriptions,
)
# Finalize MQTT subscriptions
await self.attributes_discovery_update(config)
@ -1212,6 +1227,7 @@ class MqttEntity(
"""(Re)Setup the entity."""
@abstractmethod
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
@ -1260,6 +1276,35 @@ class MqttEntity(
if attributes is not None and self._attrs_have_changed(attrs_snapshot):
mqtt_data.state_write_requests.write_state_request(self)
def add_subscription(
self,
state_topic_config_key: str,
msg_callback: Callable[[ReceiveMessage], None],
tracked_attributes: set[str] | None,
disable_encoding: bool = False,
) -> bool:
"""Add a subscription."""
qos: int = self._config[CONF_QOS]
encoding: str | None = None
if not disable_encoding:
encoding = self._config[CONF_ENCODING] or None
if (
state_topic_config_key in self._config
and self._config[state_topic_config_key] is not None
):
self._subscriptions[state_topic_config_key] = {
"topic": self._config[state_topic_config_key],
"msg_callback": partial(
self._message_callback, msg_callback, tracked_attributes
),
"entity_id": self.entity_id,
"qos": qos,
"encoding": encoding,
"job_type": HassJobType.Callback,
}
return True
return False
def update_device(
hass: HomeAssistant,

View File

@ -8,7 +8,7 @@ from homeassistant.components import notify
from homeassistant.components.notify import NotifyEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType
@ -68,6 +68,7 @@ class MqttNotify(MqttEntity, NotifyEntity):
config.get(CONF_COMMAND_TEMPLATE), entity=self
).async_render
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from collections.abc import Callable
from functools import partial
import logging
import voluptuous as vol
@ -26,7 +25,7 @@ from homeassistant.const import (
CONF_UNIT_OF_MEASUREMENT,
CONF_VALUE_TEMPLATE,
)
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType
@ -36,9 +35,7 @@ from .config import MQTT_RW_SCHEMA
from .const import (
CONF_COMMAND_TEMPLATE,
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_PAYLOAD_RESET,
CONF_QOS,
CONF_STATE_TOPIC,
)
from .mixins import MqttEntity, async_setup_entity_entry_helper
@ -193,30 +190,15 @@ class MqttNumber(MqttEntity, RestoreNumber):
self._attr_native_value = num_value
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
if self._config.get(CONF_STATE_TOPIC) is None:
if not self.add_subscription(
CONF_STATE_TOPIC, self._message_received, {"_attr_native_value"}
):
# Force into optimistic mode.
self._attr_assumed_state = True
return
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
{
"state_topic": {
"topic": self._config.get(CONF_STATE_TOPIC),
"msg_callback": partial(
self._message_callback,
self._message_received,
{"_attr_native_value"},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
},
)
async def _subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""

View File

@ -10,7 +10,7 @@ from homeassistant.components import scene
from homeassistant.components.scene import Scene
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_NAME, CONF_PAYLOAD_ON
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType
@ -72,6 +72,7 @@ class MqttScene(
def _setup_from_config(self, config: ConfigType) -> None:
"""(Re)Setup the entity."""
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from collections.abc import Callable
from functools import partial
import logging
import voluptuous as vol
@ -12,7 +11,7 @@ from homeassistant.components import select
from homeassistant.components.select import SelectEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_NAME, CONF_OPTIMISTIC, CONF_VALUE_TEMPLATE
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.restore_state import RestoreEntity
@ -20,13 +19,7 @@ from homeassistant.helpers.typing import ConfigType
from . import subscription
from .config import MQTT_RW_SCHEMA
from .const import (
CONF_COMMAND_TEMPLATE,
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_QOS,
CONF_STATE_TOPIC,
)
from .const import CONF_COMMAND_TEMPLATE, CONF_COMMAND_TOPIC, CONF_STATE_TOPIC
from .mixins import MqttEntity, async_setup_entity_entry_helper
from .models import (
MqttCommandTemplate,
@ -133,30 +126,15 @@ class MqttSelect(MqttEntity, SelectEntity, RestoreEntity):
return
self._attr_current_option = payload
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
if self._config.get(CONF_STATE_TOPIC) is None:
if not self.add_subscription(
CONF_STATE_TOPIC, self._message_received, {"_attr_current_option"}
):
# Force into optimistic mode.
self._attr_assumed_state = True
return
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
{
"state_topic": {
"topic": self._config.get(CONF_STATE_TOPIC),
"msg_callback": partial(
self._message_callback,
self._message_received,
{"_attr_current_option"},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
},
)
async def _subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""

View File

@ -4,9 +4,7 @@ from __future__ import annotations
from collections.abc import Callable
from datetime import datetime, timedelta
from functools import partial
import logging
from typing import Any
import voluptuous as vol
@ -31,13 +29,7 @@ from homeassistant.const import (
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import (
CALLBACK_TYPE,
HassJobType,
HomeAssistant,
State,
callback,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, State, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_call_later
@ -46,7 +38,7 @@ from homeassistant.util import dt as dt_util
from . import subscription
from .config import MQTT_RO_SCHEMA
from .const import CONF_ENCODING, CONF_QOS, CONF_STATE_TOPIC, PAYLOAD_NONE
from .const import CONF_STATE_TOPIC, PAYLOAD_NONE
from .mixins import MqttAvailabilityMixin, MqttEntity, async_setup_entity_entry_helper
from .models import (
MqttValueTemplate,
@ -289,25 +281,13 @@ class MqttSensor(MqttEntity, RestoreSensor):
if CONF_LAST_RESET_VALUE_TEMPLATE in self._config:
_update_last_reset(msg)
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics: dict[str, dict[str, Any]] = {}
topics["state_topic"] = {
"topic": self._config[CONF_STATE_TOPIC],
"msg_callback": partial(
self._message_callback,
self._state_message_received,
{"_attr_native_value", "_attr_last_reset", "_expired"},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
self.add_subscription(
CONF_STATE_TOPIC,
self._state_message_received,
{"_attr_native_value", "_attr_last_reset", "_expired"},
)
async def _subscribe_topics(self) -> None:

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from collections.abc import Callable
from functools import partial
import logging
from typing import Any, cast
@ -28,7 +27,7 @@ from homeassistant.const import (
CONF_PAYLOAD_OFF,
CONF_PAYLOAD_ON,
)
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.json import json_dumps
@ -41,8 +40,6 @@ from .config import MQTT_RW_SCHEMA
from .const import (
CONF_COMMAND_TEMPLATE,
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_QOS,
CONF_STATE_TOPIC,
CONF_STATE_VALUE_TEMPLATE,
PAYLOAD_EMPTY_JSON,
@ -261,30 +258,17 @@ class MqttSiren(MqttEntity, SirenEntity):
self._extra_attributes = dict(self._extra_attributes)
self._update(process_turn_on_params(self, params))
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
if self._config.get(CONF_STATE_TOPIC) is None:
if not self.add_subscription(
CONF_STATE_TOPIC,
self._state_message_received,
{"_attr_is_on", "_extra_attributes"},
):
# Force into optimistic mode.
self._optimistic = True
return
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
{
CONF_STATE_TOPIC: {
"topic": self._config.get(CONF_STATE_TOPIC),
"msg_callback": partial(
self._message_callback,
self._state_message_received,
{"_attr_is_on", "_extra_attributes"},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
},
)
async def _subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from collections.abc import Callable
from functools import partial
from typing import Any
import voluptuous as vol
@ -20,7 +19,7 @@ from homeassistant.const import (
CONF_VALUE_TEMPLATE,
STATE_ON,
)
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.restore_state import RestoreEntity
@ -29,13 +28,7 @@ from homeassistant.helpers.typing import ConfigType
from . import subscription
from .config import MQTT_RW_SCHEMA
from .const import (
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_QOS,
CONF_STATE_TOPIC,
PAYLOAD_NONE,
)
from .const import CONF_COMMAND_TOPIC, CONF_STATE_TOPIC, PAYLOAD_NONE
from .mixins import MqttEntity, async_setup_entity_entry_helper
from .models import MqttValueTemplate, ReceiveMessage
from .schemas import MQTT_ENTITY_COMMON_SCHEMA
@ -124,30 +117,15 @@ class MqttSwitch(MqttEntity, SwitchEntity, RestoreEntity):
elif payload == PAYLOAD_NONE:
self._attr_is_on = None
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
if self._config.get(CONF_STATE_TOPIC) is None:
if not self.add_subscription(
CONF_STATE_TOPIC, self._state_message_received, {"_attr_is_on"}
):
# Force into optimistic mode.
self._optimistic = True
return
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
{
CONF_STATE_TOPIC: {
"topic": self._config.get(CONF_STATE_TOPIC),
"msg_callback": partial(
self._message_callback,
self._state_message_received,
{"_attr_is_on"},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
},
)
async def _subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from collections.abc import Callable
from functools import partial
import logging
import re
from typing import Any
@ -20,23 +19,16 @@ from homeassistant.const import (
CONF_VALUE_TEMPLATE,
MAX_LENGTH_STATE_STATE,
)
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType
from . import subscription
from .config import MQTT_RW_SCHEMA
from .const import (
CONF_COMMAND_TEMPLATE,
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_QOS,
CONF_STATE_TOPIC,
)
from .const import CONF_COMMAND_TEMPLATE, CONF_COMMAND_TOPIC, CONF_STATE_TOPIC
from .mixins import MqttEntity, async_setup_entity_entry_helper
from .models import (
MessageCallbackType,
MqttCommandTemplate,
MqttValueTemplate,
PublishPayloadType,
@ -163,39 +155,15 @@ class MqttTextEntity(MqttEntity, TextEntity):
return
self._attr_native_value = payload
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics: dict[str, Any] = {}
def add_subscription(
topics: dict[str, Any],
topic: str,
msg_callback: MessageCallbackType,
tracked_attributes: set[str],
) -> None:
if self._config.get(topic) is not None:
topics[topic] = {
"topic": self._config[topic],
"msg_callback": partial(
self._message_callback, msg_callback, tracked_attributes
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
add_subscription(
topics,
self.add_subscription(
CONF_STATE_TOPIC,
self._handle_state_message_received,
{"_attr_native_value"},
)
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
)
async def _subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
subscription.async_subscribe_topics_internal(self.hass, self._sub_state)

View File

@ -2,7 +2,6 @@
from __future__ import annotations
from functools import partial
import logging
from typing import Any, TypedDict, cast
@ -16,7 +15,7 @@ from homeassistant.components.update import (
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_DEVICE_CLASS, CONF_NAME, CONF_VALUE_TEMPLATE
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.restore_state import RestoreEntity
@ -25,16 +24,9 @@ from homeassistant.util.json import JSON_DECODE_EXCEPTIONS, json_loads
from . import subscription
from .config import DEFAULT_RETAIN, MQTT_RO_SCHEMA
from .const import (
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_QOS,
CONF_RETAIN,
CONF_STATE_TOPIC,
PAYLOAD_EMPTY_JSON,
)
from .const import CONF_COMMAND_TOPIC, CONF_RETAIN, CONF_STATE_TOPIC, PAYLOAD_EMPTY_JSON
from .mixins import MqttEntity, async_setup_entity_entry_helper
from .models import MessageCallbackType, MqttValueTemplate, ReceiveMessage
from .models import MqttValueTemplate, ReceiveMessage
from .schemas import MQTT_ENTITY_COMMON_SCHEMA
from .util import valid_publish_topic, valid_subscribe_topic
@ -210,30 +202,10 @@ class MqttUpdate(MqttEntity, UpdateEntity, RestoreEntity):
if isinstance(latest_version, str) and latest_version != "":
self._attr_latest_version = latest_version
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics: dict[str, Any] = {}
def add_subscription(
topics: dict[str, Any],
topic: str,
msg_callback: MessageCallbackType,
tracked_attributes: set[str],
) -> None:
if self._config.get(topic) is not None:
topics[topic] = {
"topic": self._config[topic],
"msg_callback": partial(
self._message_callback, msg_callback, tracked_attributes
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
add_subscription(
topics,
self.add_subscription(
CONF_STATE_TOPIC,
self._handle_state_message_received,
{
@ -245,17 +217,12 @@ class MqttUpdate(MqttEntity, UpdateEntity, RestoreEntity):
"_entity_picture",
},
)
add_subscription(
topics,
self.add_subscription(
CONF_LATEST_VERSION_TOPIC,
self._handle_latest_version_received,
{"_attr_latest_version"},
)
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
)
async def _subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
subscription.async_subscribe_topics_internal(self.hass, self._sub_state)

View File

@ -8,7 +8,6 @@
from __future__ import annotations
from collections.abc import Callable
from functools import partial
import logging
from typing import Any, cast
@ -31,7 +30,7 @@ from homeassistant.const import (
STATE_IDLE,
STATE_PAUSED,
)
from homeassistant.core import HassJobType, HomeAssistant, async_get_hass, callback
from homeassistant.core import HomeAssistant, async_get_hass, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
@ -43,8 +42,6 @@ from . import subscription
from .config import MQTT_BASE_SCHEMA
from .const import (
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_QOS,
CONF_RETAIN,
CONF_SCHEMA,
CONF_STATE_TOPIC,
@ -331,25 +328,13 @@ class MqttStateVacuum(MqttEntity, StateVacuumEntity):
del payload[STATE]
self._update_state_attributes(payload)
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics: dict[str, Any] = {}
if state_topic := self._config.get(CONF_STATE_TOPIC):
topics["state_position_topic"] = {
"topic": state_topic,
"msg_callback": partial(
self._message_callback,
self._state_message_received,
{"_attr_battery_level", "_attr_fan_speed", "_attr_state"},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
self.add_subscription(
CONF_STATE_TOPIC,
self._state_message_received,
{"_attr_battery_level", "_attr_fan_speed", "_attr_state"},
)
async def _subscribe_topics(self) -> None:

View File

@ -3,7 +3,6 @@
from __future__ import annotations
from contextlib import suppress
from functools import partial
import logging
from typing import Any
@ -26,7 +25,7 @@ from homeassistant.const import (
STATE_OPEN,
STATE_OPENING,
)
from homeassistant.core import HassJobType, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType
@ -41,13 +40,11 @@ from .config import MQTT_BASE_SCHEMA
from .const import (
CONF_COMMAND_TEMPLATE,
CONF_COMMAND_TOPIC,
CONF_ENCODING,
CONF_PAYLOAD_CLOSE,
CONF_PAYLOAD_OPEN,
CONF_PAYLOAD_STOP,
CONF_POSITION_CLOSED,
CONF_POSITION_OPEN,
CONF_QOS,
CONF_RETAIN,
CONF_STATE_CLOSED,
CONF_STATE_CLOSING,
@ -337,31 +334,18 @@ class MqttValve(MqttEntity, ValveEntity):
else:
self._process_binary_valve_update(msg, state_payload)
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics = {}
if self._config.get(CONF_STATE_TOPIC):
topics["state_topic"] = {
"topic": self._config.get(CONF_STATE_TOPIC),
"msg_callback": partial(
self._message_callback,
self._state_message_received,
{
"_attr_current_valve_position",
"_attr_is_closed",
"_attr_is_closing",
"_attr_is_opening",
},
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
"job_type": HassJobType.Callback,
}
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
self.add_subscription(
CONF_STATE_TOPIC,
self._state_message_received,
{
"_attr_current_valve_position",
"_attr_is_closed",
"_attr_is_closing",
"_attr_is_opening",
},
)
async def _subscribe_topics(self) -> None:

View File

@ -281,18 +281,17 @@ class MqttWaterHeater(MqttTemperatureControlEntity, WaterHeaterEntity):
assert isinstance(payload, str)
self._attr_current_operation = payload
@callback
def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics: dict[str, dict[str, Any]] = {}
# add subscriptions for WaterHeaterEntity
self.add_subscription(
topics,
CONF_MODE_STATE_TOPIC,
self._handle_current_mode_received,
{"_attr_current_operation"},
)
self.prepare_subscribe_topics(topics)
# add subscriptions for MqttTemperatureControlEntity
self.prepare_subscribe_topics()
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set new target temperature."""