Fix blocking IO calls in mqtt client setup (#119647)

This commit is contained in:
Jan Bouwhuis 2024-06-14 00:47:38 +02:00 committed by GitHub
parent cd80b9b318
commit a992654a8b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 29 additions and 8 deletions

View File

@ -44,7 +44,7 @@ class AsyncMQTTClient(MQTTClient):
that is not needed since we are running in an async event loop. that is not needed since we are running in an async event loop.
""" """
def async_setup(self) -> None: def setup(self) -> None:
"""Set up the client. """Set up the client.
All the threading locks are replaced with NullLock All the threading locks are replaced with NullLock

View File

@ -277,9 +277,22 @@ class Subscription:
class MqttClientSetup: class MqttClientSetup:
"""Helper class to setup the paho mqtt client from config.""" """Helper class to setup the paho mqtt client from config."""
def __init__(self, config: ConfigType) -> None: _client: AsyncMQTTClient
"""Initialize the MQTT client setup helper."""
def __init__(self, config: ConfigType) -> None:
"""Initialize the MQTT client setup helper.
self.setup must be run in an executor job.
"""
self._config = config
def setup(self) -> None:
"""Set up the MQTT client.
The setup of the MQTT client should be run in an executor job,
because it accesses files, so it does IO.
"""
# We don't import on the top because some integrations # We don't import on the top because some integrations
# should be able to optionally rely on MQTT. # should be able to optionally rely on MQTT.
import paho.mqtt.client as mqtt # pylint: disable=import-outside-toplevel import paho.mqtt.client as mqtt # pylint: disable=import-outside-toplevel
@ -287,6 +300,7 @@ class MqttClientSetup:
# pylint: disable-next=import-outside-toplevel # pylint: disable-next=import-outside-toplevel
from .async_client import AsyncMQTTClient from .async_client import AsyncMQTTClient
config = self._config
if (protocol := config.get(CONF_PROTOCOL, DEFAULT_PROTOCOL)) == PROTOCOL_31: if (protocol := config.get(CONF_PROTOCOL, DEFAULT_PROTOCOL)) == PROTOCOL_31:
proto = mqtt.MQTTv31 proto = mqtt.MQTTv31
elif protocol == PROTOCOL_5: elif protocol == PROTOCOL_5:
@ -298,11 +312,14 @@ class MqttClientSetup:
# PAHO MQTT relies on the MQTT server to generate random client IDs. # PAHO MQTT relies on the MQTT server to generate random client IDs.
# However, that feature is not mandatory so we generate our own. # However, that feature is not mandatory so we generate our own.
client_id = mqtt.base62(uuid.uuid4().int, padding=22) client_id = mqtt.base62(uuid.uuid4().int, padding=22)
transport = config.get(CONF_TRANSPORT, DEFAULT_TRANSPORT) transport: str = config.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
self._client = AsyncMQTTClient( self._client = AsyncMQTTClient(
client_id, protocol=proto, transport=transport, reconnect_on_failure=False client_id,
protocol=proto,
transport=transport,
reconnect_on_failure=False,
) )
self._client.async_setup() self._client.setup()
# Enable logging # Enable logging
self._client.enable_logger() self._client.enable_logger()
@ -544,7 +561,9 @@ class MQTT:
self.hass, "homeassistant.components.mqtt.async_client" self.hass, "homeassistant.components.mqtt.async_client"
) )
mqttc = MqttClientSetup(self.conf).client mqttc_setup = MqttClientSetup(self.conf)
await self.hass.async_add_executor_job(mqttc_setup.setup)
mqttc = mqttc_setup.client
# on_socket_unregister_write and _async_on_socket_close # on_socket_unregister_write and _async_on_socket_close
# are only ever called in the event loop # are only ever called in the event loop
mqttc.on_socket_close = self._async_on_socket_close mqttc.on_socket_close = self._async_on_socket_close

View File

@ -834,7 +834,9 @@ def try_connection(
# should be able to optionally rely on MQTT. # should be able to optionally rely on MQTT.
import paho.mqtt.client as mqtt # pylint: disable=import-outside-toplevel import paho.mqtt.client as mqtt # pylint: disable=import-outside-toplevel
client = MqttClientSetup(user_input).client mqtt_client_setup = MqttClientSetup(user_input)
mqtt_client_setup.setup()
client = mqtt_client_setup.client
result: queue.Queue[bool] = queue.Queue(maxsize=1) result: queue.Queue[bool] = queue.Queue(maxsize=1)