mirror of
https://github.com/home-assistant/core.git
synced 2025-07-22 20:57:21 +00:00
Ensure MQTT resubscribes happen before birth message (#116471)
This commit is contained in:
parent
6c446b4e59
commit
2e9b1916c0
@ -877,6 +877,22 @@ class MQTT:
|
|||||||
|
|
||||||
await self._wait_for_mid(mid)
|
await self._wait_for_mid(mid)
|
||||||
|
|
||||||
|
async def _async_resubscribe_and_publish_birth_message(
|
||||||
|
self, birth_message: PublishMessage
|
||||||
|
) -> None:
|
||||||
|
"""Resubscribe to all topics and publish birth message."""
|
||||||
|
await self._async_perform_subscriptions()
|
||||||
|
await self._ha_started.wait() # Wait for Home Assistant to start
|
||||||
|
await self._discovery_cooldown() # Wait for MQTT discovery to cool down
|
||||||
|
# Update subscribe cooldown period to a shorter time
|
||||||
|
self._subscribe_debouncer.set_timeout(SUBSCRIBE_COOLDOWN)
|
||||||
|
await self.async_publish(
|
||||||
|
topic=birth_message.topic,
|
||||||
|
payload=birth_message.payload,
|
||||||
|
qos=birth_message.qos,
|
||||||
|
retain=birth_message.retain,
|
||||||
|
)
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def _async_mqtt_on_connect(
|
def _async_mqtt_on_connect(
|
||||||
self,
|
self,
|
||||||
@ -918,36 +934,33 @@ class MQTT:
|
|||||||
result_code,
|
result_code,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.hass.async_create_task(self._async_resubscribe())
|
self._async_queue_resubscribe()
|
||||||
|
birth: dict[str, Any]
|
||||||
if birth := self.conf.get(CONF_BIRTH_MESSAGE, DEFAULT_BIRTH):
|
if birth := self.conf.get(CONF_BIRTH_MESSAGE, DEFAULT_BIRTH):
|
||||||
|
|
||||||
async def publish_birth_message(birth_message: PublishMessage) -> None:
|
|
||||||
await self._ha_started.wait() # Wait for Home Assistant to start
|
|
||||||
await self._discovery_cooldown() # Wait for MQTT discovery to cool down
|
|
||||||
# Update subscribe cooldown period to a shorter time
|
|
||||||
self._subscribe_debouncer.set_timeout(SUBSCRIBE_COOLDOWN)
|
|
||||||
await self.async_publish(
|
|
||||||
topic=birth_message.topic,
|
|
||||||
payload=birth_message.payload,
|
|
||||||
qos=birth_message.qos,
|
|
||||||
retain=birth_message.retain,
|
|
||||||
)
|
|
||||||
|
|
||||||
birth_message = PublishMessage(**birth)
|
birth_message = PublishMessage(**birth)
|
||||||
self.config_entry.async_create_background_task(
|
self.config_entry.async_create_background_task(
|
||||||
self.hass,
|
self.hass,
|
||||||
publish_birth_message(birth_message),
|
self._async_resubscribe_and_publish_birth_message(birth_message),
|
||||||
name="mqtt birth message",
|
name="mqtt re-subscribe and birth",
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Update subscribe cooldown period to a shorter time
|
# Update subscribe cooldown period to a shorter time
|
||||||
|
self.config_entry.async_create_background_task(
|
||||||
|
self.hass,
|
||||||
|
self._async_perform_subscriptions(),
|
||||||
|
name="mqtt re-subscribe",
|
||||||
|
)
|
||||||
self._subscribe_debouncer.set_timeout(SUBSCRIBE_COOLDOWN)
|
self._subscribe_debouncer.set_timeout(SUBSCRIBE_COOLDOWN)
|
||||||
|
|
||||||
self._async_connection_result(True)
|
self._async_connection_result(True)
|
||||||
|
|
||||||
async def _async_resubscribe(self) -> None:
|
@callback
|
||||||
"""Resubscribe on reconnect."""
|
def _async_queue_resubscribe(self) -> None:
|
||||||
|
"""Queue subscriptions on reconnect.
|
||||||
|
|
||||||
|
self._async_perform_subscriptions must be called
|
||||||
|
after this method to actually subscribe.
|
||||||
|
"""
|
||||||
self._max_qos.clear()
|
self._max_qos.clear()
|
||||||
self._retained_topics.clear()
|
self._retained_topics.clear()
|
||||||
# Group subscriptions to only re-subscribe once for each topic.
|
# Group subscriptions to only re-subscribe once for each topic.
|
||||||
@ -962,7 +975,6 @@ class MQTT:
|
|||||||
],
|
],
|
||||||
queue_only=True,
|
queue_only=True,
|
||||||
)
|
)
|
||||||
await self._async_perform_subscriptions()
|
|
||||||
|
|
||||||
@lru_cache(None) # pylint: disable=method-cache-max-size-none
|
@lru_cache(None) # pylint: disable=method-cache-max-size-none
|
||||||
def _matching_subscriptions(self, topic: str) -> list[Subscription]:
|
def _matching_subscriptions(self, topic: str) -> list[Subscription]:
|
||||||
@ -1049,7 +1061,9 @@ class MQTT:
|
|||||||
# The callback signature for on_unsubscribe is different from on_subscribe
|
# The callback signature for on_unsubscribe is different from on_subscribe
|
||||||
# see https://github.com/eclipse/paho.mqtt.python/issues/687
|
# see https://github.com/eclipse/paho.mqtt.python/issues/687
|
||||||
# properties and reasoncodes are not used in Home Assistant
|
# properties and reasoncodes are not used in Home Assistant
|
||||||
self.hass.async_create_task(self._mqtt_handle_mid(mid))
|
self.config_entry.async_create_task(
|
||||||
|
self.hass, self._mqtt_handle_mid(mid), name=f"mqtt handle mid {mid}"
|
||||||
|
)
|
||||||
|
|
||||||
async def _mqtt_handle_mid(self, mid: int) -> None:
|
async def _mqtt_handle_mid(self, mid: int) -> None:
|
||||||
# Create the mid event if not created, either _mqtt_handle_mid or _wait_for_mid
|
# Create the mid event if not created, either _mqtt_handle_mid or _wait_for_mid
|
||||||
|
Loading…
x
Reference in New Issue
Block a user