mirror of
https://github.com/home-assistant/core.git
synced 2025-07-13 08:17:08 +00:00
Fix MQTT discovery cooldown too short with large setup (#116550)
* Fix MQTT discovery cooldown too short with large setup * Set to 5 sec * Only change the discovery cooldown * Fire immediatly when teh debouncing period is over
This commit is contained in:
parent
ef2ae7b600
commit
7fd60ddba4
@ -82,7 +82,7 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
DISCOVERY_COOLDOWN = 2
|
DISCOVERY_COOLDOWN = 5
|
||||||
INITIAL_SUBSCRIBE_COOLDOWN = 1.0
|
INITIAL_SUBSCRIBE_COOLDOWN = 1.0
|
||||||
SUBSCRIBE_COOLDOWN = 0.1
|
SUBSCRIBE_COOLDOWN = 0.1
|
||||||
UNSUBSCRIBE_COOLDOWN = 0.1
|
UNSUBSCRIBE_COOLDOWN = 0.1
|
||||||
@ -348,6 +348,12 @@ class EnsureJobAfterCooldown:
|
|||||||
self._task = create_eager_task(self._async_job())
|
self._task = create_eager_task(self._async_job())
|
||||||
self._task.add_done_callback(self._async_task_done)
|
self._task.add_done_callback(self._async_task_done)
|
||||||
|
|
||||||
|
async def async_fire(self) -> None:
|
||||||
|
"""Execute the job immediately."""
|
||||||
|
if self._task:
|
||||||
|
await self._task
|
||||||
|
self._async_execute()
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def _async_cancel_timer(self) -> None:
|
def _async_cancel_timer(self) -> None:
|
||||||
"""Cancel any pending task."""
|
"""Cancel any pending task."""
|
||||||
@ -840,7 +846,7 @@ class MQTT:
|
|||||||
|
|
||||||
for topic, qos in subscriptions.items():
|
for topic, qos in subscriptions.items():
|
||||||
_LOGGER.debug("Subscribing to %s, mid: %s, qos: %s", topic, mid, qos)
|
_LOGGER.debug("Subscribing to %s, mid: %s, qos: %s", topic, mid, qos)
|
||||||
self._last_subscribe = time.time()
|
self._last_subscribe = time.monotonic()
|
||||||
|
|
||||||
if result == 0:
|
if result == 0:
|
||||||
await self._wait_for_mid(mid)
|
await self._wait_for_mid(mid)
|
||||||
@ -870,6 +876,8 @@ class MQTT:
|
|||||||
await self._ha_started.wait() # Wait for Home Assistant to start
|
await self._ha_started.wait() # Wait for Home Assistant to start
|
||||||
await self._discovery_cooldown() # Wait for MQTT discovery to cool down
|
await self._discovery_cooldown() # Wait for MQTT discovery to cool down
|
||||||
# Update subscribe cooldown period to a shorter time
|
# Update subscribe cooldown period to a shorter time
|
||||||
|
# and make sure we flush the debouncer
|
||||||
|
await self._subscribe_debouncer.async_fire()
|
||||||
self._subscribe_debouncer.set_timeout(SUBSCRIBE_COOLDOWN)
|
self._subscribe_debouncer.set_timeout(SUBSCRIBE_COOLDOWN)
|
||||||
await self.async_publish(
|
await self.async_publish(
|
||||||
topic=birth_message.topic,
|
topic=birth_message.topic,
|
||||||
@ -1115,7 +1123,7 @@ class MQTT:
|
|||||||
|
|
||||||
async def _discovery_cooldown(self) -> None:
|
async def _discovery_cooldown(self) -> None:
|
||||||
"""Wait until all discovery and subscriptions are processed."""
|
"""Wait until all discovery and subscriptions are processed."""
|
||||||
now = time.time()
|
now = time.monotonic()
|
||||||
# Reset discovery and subscribe cooldowns
|
# Reset discovery and subscribe cooldowns
|
||||||
self._mqtt_data.last_discovery = now
|
self._mqtt_data.last_discovery = now
|
||||||
self._last_subscribe = now
|
self._last_subscribe = now
|
||||||
@ -1127,7 +1135,7 @@ class MQTT:
|
|||||||
)
|
)
|
||||||
while now < wait_until:
|
while now < wait_until:
|
||||||
await asyncio.sleep(wait_until - now)
|
await asyncio.sleep(wait_until - now)
|
||||||
now = time.time()
|
now = time.monotonic()
|
||||||
last_discovery = self._mqtt_data.last_discovery
|
last_discovery = self._mqtt_data.last_discovery
|
||||||
last_subscribe = (
|
last_subscribe = (
|
||||||
now if self._pending_subscriptions else self._last_subscribe
|
now if self._pending_subscriptions else self._last_subscribe
|
||||||
|
@ -177,7 +177,7 @@ async def async_start( # noqa: C901
|
|||||||
@callback
|
@callback
|
||||||
def async_discovery_message_received(msg: ReceiveMessage) -> None: # noqa: C901
|
def async_discovery_message_received(msg: ReceiveMessage) -> None: # noqa: C901
|
||||||
"""Process the received message."""
|
"""Process the received message."""
|
||||||
mqtt_data.last_discovery = time.time()
|
mqtt_data.last_discovery = time.monotonic()
|
||||||
payload = msg.payload
|
payload = msg.payload
|
||||||
topic = msg.topic
|
topic = msg.topic
|
||||||
topic_trimmed = topic.replace(f"{discovery_topic}/", "", 1)
|
topic_trimmed = topic.replace(f"{discovery_topic}/", "", 1)
|
||||||
@ -370,7 +370,7 @@ async def async_start( # noqa: C901
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
mqtt_data.last_discovery = time.time()
|
mqtt_data.last_discovery = time.monotonic()
|
||||||
mqtt_integrations = await async_get_mqtt(hass)
|
mqtt_integrations = await async_get_mqtt(hass)
|
||||||
|
|
||||||
for integration, topics in mqtt_integrations.items():
|
for integration, topics in mqtt_integrations.items():
|
||||||
|
Loading…
x
Reference in New Issue
Block a user