Convert mqtt to use a timer instead of task sleep loop (#118666)

This commit is contained in:
J. Nick Koston 2024-06-03 11:19:19 -05:00 committed by GitHub
parent 099ad77078
commit 9cb113e5d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -465,7 +465,7 @@ class MQTT:
self._subscribe_debouncer = EnsureJobAfterCooldown(
INITIAL_SUBSCRIBE_COOLDOWN, self._async_perform_subscriptions
)
self._misc_task: asyncio.Task | None = None
self._misc_timer: asyncio.TimerHandle | None = None
self._reconnect_task: asyncio.Task | None = None
self._should_reconnect: bool = True
self._available_future: asyncio.Future[bool] | None = None
@ -563,14 +563,6 @@ class MQTT:
self._mqttc = mqttc
async def _misc_loop(self) -> None:
"""Start the MQTT client misc loop."""
# pylint: disable=import-outside-toplevel
import paho.mqtt.client as mqtt
while self._mqttc.loop_misc() == mqtt.MQTT_ERR_SUCCESS:
await asyncio.sleep(1)
@callback
def _async_reader_callback(self, client: mqtt.Client) -> None:
"""Handle reading data from the socket."""
@ -578,13 +570,22 @@ class MQTT:
self._async_on_disconnect(status)
@callback
def _async_start_misc_loop(self) -> None:
"""Start the misc loop."""
if self._misc_task is None or self._misc_task.done():
_LOGGER.debug("%s: Starting client misc loop", self.config_entry.title)
self._misc_task = self.config_entry.async_create_background_task(
self.hass, self._misc_loop(), name="mqtt misc loop"
)
def _async_start_misc_periodic(self) -> None:
"""Start the misc periodic."""
assert self._misc_timer is None, "Misc periodic already started"
_LOGGER.debug("%s: Starting client misc loop", self.config_entry.title)
# pylint: disable=import-outside-toplevel
import paho.mqtt.client as mqtt
# Inner function to avoid having to check late import
# each time the function is called.
@callback
def _async_misc() -> None:
"""Start the MQTT client misc loop."""
if self._mqttc.loop_misc() == mqtt.MQTT_ERR_SUCCESS:
self._misc_timer = self.loop.call_at(self.loop.time() + 1, _async_misc)
self._misc_timer = self.loop.call_at(self.loop.time() + 1, _async_misc)
def _increase_socket_buffer_size(self, sock: SocketType) -> None:
"""Increase the socket buffer size."""
@ -635,7 +636,8 @@ class MQTT:
if fileno > -1:
self._increase_socket_buffer_size(sock)
self.loop.add_reader(sock, partial(self._async_reader_callback, client))
self._async_start_misc_loop()
if not self._misc_timer:
self._async_start_misc_periodic()
# Try to consume the buffer right away so it doesn't fill up
# since add_reader will wait for the next loop iteration
self._async_reader_callback(client)
@ -652,8 +654,9 @@ class MQTT:
self._async_connection_result(False)
if fileno > -1:
self.loop.remove_reader(sock)
if self._misc_task is not None and not self._misc_task.done():
self._misc_task.cancel()
if self._misc_timer:
self._misc_timer.cancel()
self._misc_timer = None
@callback
def _async_writer_callback(self, client: mqtt.Client) -> None: