diff --git a/homeassistant/components/mqtt/__init__.py b/homeassistant/components/mqtt/__init__.py index a43169c3afb..d7d11e6f49a 100644 --- a/homeassistant/components/mqtt/__init__.py +++ b/homeassistant/components/mqtt/__init__.py @@ -31,7 +31,7 @@ from homeassistant.const import ( EVENT_HOMEASSISTANT_STOP, ) from homeassistant.const import CONF_UNIQUE_ID # noqa: F401 -from homeassistant.core import CoreState, Event, ServiceCall, callback +from homeassistant.core import CoreState, Event, HassJob, ServiceCall, callback from homeassistant.exceptions import HomeAssistantError, Unauthorized from homeassistant.helpers import config_validation as cv, event, template from homeassistant.helpers.dispatcher import async_dispatcher_connect, dispatcher_send @@ -630,7 +630,7 @@ class Subscription: topic: str = attr.ib() matcher: Any = attr.ib() - callback: MessageCallbackType = attr.ib() + job: HassJob = attr.ib() qos: int = attr.ib(default=0) encoding: str = attr.ib(default="utf-8") @@ -839,7 +839,7 @@ class MQTT: raise HomeAssistantError("Topic needs to be a string!") subscription = Subscription( - topic, _matcher_for_topic(topic), msg_callback, qos, encoding + topic, _matcher_for_topic(topic), HassJob(msg_callback), qos, encoding ) self.subscriptions.append(subscription) self._matching_subscriptions.cache_clear() @@ -978,12 +978,12 @@ class MQTT: msg.payload, msg.topic, subscription.encoding, - subscription.callback, + subscription.job, ) continue - self.hass.async_run_job( - subscription.callback, + self.hass.async_run_hass_job( + subscription.job, Message( msg.topic, payload, @@ -1479,14 +1479,16 @@ async def websocket_subscribe(hass, connection, msg): def async_subscribe_connection_status(hass, connection_status_callback): """Subscribe to MQTT connection changes.""" + connection_status_callback_job = HassJob(connection_status_callback) + @callback def connected(): - hass.async_add_job(connection_status_callback, True) + hass.async_add_hass_job(connection_status_callback_job, True) @callback def disconnected(): _LOGGER.error("Calling connection_status_callback, False") - hass.async_add_job(connection_status_callback, False) + hass.async_add_hass_job(connection_status_callback_job, False) subscriptions = { "connect": async_dispatcher_connect(hass, MQTT_CONNECTED, connected),