diff --git a/homeassistant/components/manual_mqtt/alarm_control_panel.py b/homeassistant/components/manual_mqtt/alarm_control_panel.py index fd6adb009aa..adb251bd71a 100644 --- a/homeassistant/components/manual_mqtt/alarm_control_panel.py +++ b/homeassistant/components/manual_mqtt/alarm_control_panel.py @@ -187,13 +187,19 @@ PLATFORM_SCHEMA = vol.Schema( ) -def setup_platform( +async def async_setup_platform( hass: HomeAssistant, config: ConfigType, add_entities: AddEntitiesCallback, discovery_info: DiscoveryInfoType | None = None, ) -> None: """Set up the manual MQTT alarm platform.""" + # Make sure MQTT integration is enabled and the client is available + # We cannot count on dependencies as the alarm_control_panel platform setup + # also will be triggered when mqtt is loading the `alarm_control_panel` platform + if not await mqtt.async_wait_for_mqtt_client(hass): + _LOGGER.error("MQTT integration is not available") + return add_entities( [ ManualMQTTAlarm( diff --git a/homeassistant/components/mqtt/__init__.py b/homeassistant/components/mqtt/__init__.py index e588acec53a..58260833559 100644 --- a/homeassistant/components/mqtt/__init__.py +++ b/homeassistant/components/mqtt/__init__.py @@ -68,6 +68,7 @@ from .const import ( # noqa: F401 CONF_WS_HEADERS, CONF_WS_PATH, DATA_MQTT, + DATA_MQTT_AVAILABLE, DEFAULT_DISCOVERY, DEFAULT_ENCODING, DEFAULT_PREFIX, @@ -87,8 +88,9 @@ from .models import ( # noqa: F401 ReceiveMessage, ReceivePayloadType, ) -from .util import ( +from .util import ( # noqa: F401 async_create_certificate_temp_files, + async_wait_for_mqtt_client, get_mqtt_data, mqtt_config_entry_enabled, valid_publish_topic, @@ -183,34 +185,54 @@ async def _async_config_entry_updated(hass: HomeAssistant, entry: ConfigEntry) - async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Load a config entry.""" - conf = dict(entry.data) - # Fetch configuration - hass_config = await conf_util.async_hass_config_yaml(hass) - mqtt_yaml = PLATFORM_CONFIG_SCHEMA_BASE(hass_config.get(DOMAIN, {})) - client = MQTT(hass, entry, conf) - if DOMAIN in hass.data: - mqtt_data = get_mqtt_data(hass) - mqtt_data.config = mqtt_yaml - mqtt_data.client = client - else: - # Initial setup - websocket_api.async_register_command(hass, websocket_subscribe) - websocket_api.async_register_command(hass, websocket_mqtt_info) - hass.data[DATA_MQTT] = mqtt_data = MqttData(config=mqtt_yaml, client=client) - client.start(mqtt_data) + conf: dict[str, Any] + mqtt_data: MqttData - await async_create_certificate_temp_files(hass, dict(entry.data)) - # Restore saved subscriptions - if mqtt_data.subscriptions_to_restore: - mqtt_data.client.async_restore_tracked_subscriptions( - mqtt_data.subscriptions_to_restore + async def _setup_client() -> tuple[MqttData, dict[str, Any]]: + """Set up the MQTT client.""" + # Fetch configuration + conf = dict(entry.data) + hass_config = await conf_util.async_hass_config_yaml(hass) + mqtt_yaml = PLATFORM_CONFIG_SCHEMA_BASE(hass_config.get(DOMAIN, {})) + client = MQTT(hass, entry, conf) + if DOMAIN in hass.data: + mqtt_data = get_mqtt_data(hass) + mqtt_data.config = mqtt_yaml + mqtt_data.client = client + else: + # Initial setup + websocket_api.async_register_command(hass, websocket_subscribe) + websocket_api.async_register_command(hass, websocket_mqtt_info) + hass.data[DATA_MQTT] = mqtt_data = MqttData(config=mqtt_yaml, client=client) + client.start(mqtt_data) + + await async_create_certificate_temp_files(hass, dict(entry.data)) + # Restore saved subscriptions + if mqtt_data.subscriptions_to_restore: + mqtt_data.client.async_restore_tracked_subscriptions( + mqtt_data.subscriptions_to_restore + ) + mqtt_data.subscriptions_to_restore = [] + mqtt_data.reload_dispatchers.append( + entry.add_update_listener(_async_config_entry_updated) ) - mqtt_data.subscriptions_to_restore = [] - mqtt_data.reload_dispatchers.append( - entry.add_update_listener(_async_config_entry_updated) - ) - await mqtt_data.client.async_connect() + await mqtt_data.client.async_connect() + return (mqtt_data, conf) + + client_available: asyncio.Future[bool] + if DATA_MQTT_AVAILABLE not in hass.data: + client_available = hass.data[DATA_MQTT_AVAILABLE] = asyncio.Future() + else: + client_available = hass.data[DATA_MQTT_AVAILABLE] + + setup_ok: bool = False + try: + mqtt_data, conf = await _setup_client() + setup_ok = True + finally: + if not client_available.done(): + client_available.set_result(setup_ok) async def async_publish_service(call: ServiceCall) -> None: """Handle MQTT publish service calls.""" @@ -565,6 +587,9 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: registry_hooks.popitem()[1]() # Wait for all ACKs and stop the loop await mqtt_client.async_disconnect() + + # Cleanup MQTT client availability + hass.data.pop(DATA_MQTT_AVAILABLE, None) # Store remaining subscriptions to be able to restore or reload them # when the entry is set up again if subscriptions := mqtt_client.subscriptions: diff --git a/homeassistant/components/mqtt/const.py b/homeassistant/components/mqtt/const.py index fe7c6eb4cf0..c91c54a79a4 100644 --- a/homeassistant/components/mqtt/const.py +++ b/homeassistant/components/mqtt/const.py @@ -35,6 +35,7 @@ CONF_CLIENT_CERT = "client_cert" CONF_TLS_INSECURE = "tls_insecure" DATA_MQTT = "mqtt" +DATA_MQTT_AVAILABLE = "mqtt_client_available" DEFAULT_PREFIX = "homeassistant" DEFAULT_BIRTH_WILL_TOPIC = DEFAULT_PREFIX + "/status" diff --git a/homeassistant/components/mqtt/util.py b/homeassistant/components/mqtt/util.py index 155756a58dc..896ba21f802 100644 --- a/homeassistant/components/mqtt/util.py +++ b/homeassistant/components/mqtt/util.py @@ -2,13 +2,16 @@ from __future__ import annotations +import asyncio import os from pathlib import Path import tempfile from typing import Any +import async_timeout import voluptuous as vol +from homeassistant.config_entries import ConfigEntryState from homeassistant.core import HomeAssistant from homeassistant.helpers import config_validation as cv, template from homeassistant.helpers.typing import ConfigType @@ -22,6 +25,7 @@ from .const import ( CONF_CLIENT_CERT, CONF_CLIENT_KEY, DATA_MQTT, + DATA_MQTT_AVAILABLE, DEFAULT_ENCODING, DEFAULT_QOS, DEFAULT_RETAIN, @@ -29,6 +33,8 @@ from .const import ( ) from .models import MqttData +AVAILABILITY_TIMEOUT = 30.0 + TEMP_DIR_NAME = f"home-assistant-{DOMAIN}" _VALID_QOS_SCHEMA = vol.All(vol.Coerce(int), vol.In([0, 1, 2])) @@ -41,6 +47,37 @@ def mqtt_config_entry_enabled(hass: HomeAssistant) -> bool | None: return not bool(hass.config_entries.async_entries(DOMAIN)[0].disabled_by) +async def async_wait_for_mqtt_client(hass: HomeAssistant) -> bool: + """Wait for the MQTT client to become available. + + Waits when mqtt set up is in progress, + It is not needed that the client is connected. + Returns True if the mqtt client is available. + Returns False when the client is not available. + """ + if not mqtt_config_entry_enabled(hass): + return False + + entry = hass.config_entries.async_entries(DOMAIN)[0] + if entry.state == ConfigEntryState.LOADED: + return True + + state_reached_future: asyncio.Future[bool] + if DATA_MQTT_AVAILABLE not in hass.data: + hass.data[DATA_MQTT_AVAILABLE] = state_reached_future = asyncio.Future() + else: + state_reached_future = hass.data[DATA_MQTT_AVAILABLE] + if state_reached_future.done(): + return state_reached_future.result() + + try: + async with async_timeout.timeout(AVAILABILITY_TIMEOUT): + # Await the client setup or an error state was received + return await state_reached_future + except asyncio.TimeoutError: + return False + + def valid_topic(topic: Any) -> str: """Validate that this is a valid topic name/filter.""" validated_topic = cv.string(topic) diff --git a/homeassistant/components/mqtt_json/device_tracker.py b/homeassistant/components/mqtt_json/device_tracker.py index 2c67751551c..2b355eb68e6 100644 --- a/homeassistant/components/mqtt_json/device_tracker.py +++ b/homeassistant/components/mqtt_json/device_tracker.py @@ -47,6 +47,13 @@ async def async_setup_scanner( discovery_info: DiscoveryInfoType | None = None, ) -> bool: """Set up the MQTT JSON tracker.""" + # Make sure MQTT integration is enabled and the client is available + # We cannot count on dependencies as the device_tracker platform setup + # also will be triggered when mqtt is loading the `device_tracker` platform + if not await mqtt.async_wait_for_mqtt_client(hass): + _LOGGER.error("MQTT integration is not available") + return False + devices = config[CONF_DEVICES] qos = config[CONF_QOS] diff --git a/homeassistant/components/mqtt_room/sensor.py b/homeassistant/components/mqtt_room/sensor.py index b1b52e42fce..00441690b47 100644 --- a/homeassistant/components/mqtt_room/sensor.py +++ b/homeassistant/components/mqtt_room/sensor.py @@ -68,6 +68,12 @@ async def async_setup_platform( discovery_info: DiscoveryInfoType | None = None, ) -> None: """Set up MQTT room Sensor.""" + # Make sure MQTT integration is enabled and the client is available + # We cannot count on dependencies as the sensor platform setup + # also will be triggered when mqtt is loading the `sensor` platform + if not await mqtt.async_wait_for_mqtt_client(hass): + _LOGGER.error("MQTT integration is not available") + return async_add_entities( [ MQTTRoomSensor( diff --git a/homeassistant/components/snips/__init__.py b/homeassistant/components/snips/__init__.py index d4619fa3b3a..ac26a5bc8dc 100644 --- a/homeassistant/components/snips/__init__.py +++ b/homeassistant/components/snips/__init__.py @@ -90,12 +90,8 @@ SERVICE_SCHEMA_FEEDBACK = vol.Schema( async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Activate Snips component.""" - # Make sure MQTT is available and the entry is loaded - if not hass.config_entries.async_entries( - mqtt.DOMAIN - ) or not await hass.config_entries.async_wait_component( - hass.config_entries.async_entries(mqtt.DOMAIN)[0] - ): + # Make sure MQTT integration is enabled and the client is available + if not await mqtt.async_wait_for_mqtt_client(hass): _LOGGER.error("MQTT integration is not available") return False diff --git a/tests/components/manual_mqtt/test_alarm_control_panel.py b/tests/components/manual_mqtt/test_alarm_control_panel.py index 549fa995179..0df1114bf30 100644 --- a/tests/components/manual_mqtt/test_alarm_control_panel.py +++ b/tests/components/manual_mqtt/test_alarm_control_panel.py @@ -1506,3 +1506,24 @@ async def test_state_changes_are_published_to_mqtt( mqtt_mock.async_publish.assert_called_once_with( "alarm/state", STATE_ALARM_DISARMED, 0, True ) + + +async def test_no_mqtt(hass: HomeAssistant, caplog: pytest.LogCaptureFixture) -> None: + """Test publishing of MQTT messages when state changes.""" + assert await async_setup_component( + hass, + alarm_control_panel.DOMAIN, + { + alarm_control_panel.DOMAIN: { + "platform": "manual_mqtt", + "name": "test", + "state_topic": "alarm/state", + "command_topic": "alarm/command", + } + }, + ) + await hass.async_block_till_done() + + entity_id = "alarm_control_panel.test" + assert hass.states.get(entity_id) is None + assert "MQTT integration is not available" in caplog.text diff --git a/tests/components/mqtt/test_util.py b/tests/components/mqtt/test_util.py index fa9451ebf73..96577bd3fa4 100644 --- a/tests/components/mqtt/test_util.py +++ b/tests/components/mqtt/test_util.py @@ -1,12 +1,17 @@ """Test MQTT utils.""" +from collections.abc import Callable from random import getrandbits from unittest.mock import patch import pytest from homeassistant.components import mqtt -from homeassistant.core import HomeAssistant +from homeassistant.config_entries import ConfigEntryDisabler, ConfigEntryState +from homeassistant.core import CoreState, HomeAssistant + +from tests.common import MockConfigEntry +from tests.typing import MqttMockHAClient, MqttMockPahoClient @pytest.fixture(autouse=True) @@ -48,3 +53,163 @@ async def test_reading_non_exitisting_certificate_file() -> None: assert ( mqtt.util.migrate_certificate_file_to_content("/home/file_not_exists") is None ) + + +@patch("homeassistant.components.mqtt.PLATFORMS", []) +async def test_waiting_for_client_not_loaded( + hass: HomeAssistant, + mqtt_client_mock: MqttMockPahoClient, +) -> None: + """Test waiting for client while mqtt entry is not yet loaded.""" + hass.state = CoreState.starting + await hass.async_block_till_done() + + entry = MockConfigEntry( + domain=mqtt.DOMAIN, + data={"broker": "test-broker"}, + state=ConfigEntryState.NOT_LOADED, + ) + entry.add_to_hass(hass) + + unsubs: list[Callable[[], None]] = [] + + async def _async_just_in_time_subscribe() -> Callable[[], None]: + nonlocal unsub + assert await mqtt.async_wait_for_mqtt_client(hass) + # Awaiting a second time should work too and return True + assert await mqtt.async_wait_for_mqtt_client(hass) + unsubs.append(await mqtt.async_subscribe(hass, "test_topic", lambda msg: None)) + + # Simulate some integration waiting for the client to become available + hass.async_add_job(_async_just_in_time_subscribe) + hass.async_add_job(_async_just_in_time_subscribe) + hass.async_add_job(_async_just_in_time_subscribe) + hass.async_add_job(_async_just_in_time_subscribe) + + assert entry.state == ConfigEntryState.NOT_LOADED + assert await hass.config_entries.async_setup(entry.entry_id) + assert len(unsubs) == 4 + for unsub in unsubs: + unsub() + + +@patch("homeassistant.components.mqtt.PLATFORMS", []) +async def test_waiting_for_client_loaded( + hass: HomeAssistant, + mqtt_mock: MqttMockHAClient, +) -> None: + """Test waiting for client where mqtt entry is loaded.""" + unsub: Callable[[], None] | None = None + + async def _async_just_in_time_subscribe() -> Callable[[], None]: + nonlocal unsub + assert await mqtt.async_wait_for_mqtt_client(hass) + unsub = await mqtt.async_subscribe(hass, "test_topic", lambda msg: None) + + entry = hass.config_entries.async_entries(mqtt.DATA_MQTT)[0] + assert entry.state == ConfigEntryState.LOADED + + await _async_just_in_time_subscribe() + + assert unsub is not None + unsub() + + +async def test_waiting_for_client_entry_fails( + hass: HomeAssistant, + mqtt_client_mock: MqttMockPahoClient, +) -> None: + """Test waiting for client where mqtt entry is failing.""" + hass.state = CoreState.starting + await hass.async_block_till_done() + + entry = MockConfigEntry( + domain=mqtt.DOMAIN, + data={"broker": "test-broker"}, + state=ConfigEntryState.NOT_LOADED, + ) + entry.add_to_hass(hass) + + async def _async_just_in_time_subscribe() -> Callable[[], None]: + assert not await mqtt.async_wait_for_mqtt_client(hass) + + hass.async_add_job(_async_just_in_time_subscribe) + assert entry.state == ConfigEntryState.NOT_LOADED + with patch( + "homeassistant.components.mqtt.async_setup_entry", + side_effect=Exception, + ): + await hass.config_entries.async_setup(entry.entry_id) + assert entry.state == ConfigEntryState.SETUP_ERROR + + +async def test_waiting_for_client_setup_fails( + hass: HomeAssistant, + mqtt_client_mock: MqttMockPahoClient, +) -> None: + """Test waiting for client where mqtt entry is failing during setup.""" + hass.state = CoreState.starting + await hass.async_block_till_done() + + entry = MockConfigEntry( + domain=mqtt.DOMAIN, + data={"broker": "test-broker"}, + state=ConfigEntryState.NOT_LOADED, + ) + entry.add_to_hass(hass) + + async def _async_just_in_time_subscribe() -> Callable[[], None]: + assert not await mqtt.async_wait_for_mqtt_client(hass) + + hass.async_add_job(_async_just_in_time_subscribe) + assert entry.state == ConfigEntryState.NOT_LOADED + + # Simulate MQTT setup fails before the client would become available + mqtt_client_mock.connect.side_effect = Exception + assert not await hass.config_entries.async_setup(entry.entry_id) + assert entry.state == ConfigEntryState.SETUP_ERROR + + +@patch("homeassistant.components.mqtt.util.AVAILABILITY_TIMEOUT", 0.01) +async def test_waiting_for_client_timeout( + hass: HomeAssistant, +) -> None: + """Test waiting for client with timeout.""" + hass.state = CoreState.starting + await hass.async_block_till_done() + + entry = MockConfigEntry( + domain=mqtt.DOMAIN, + data={"broker": "test-broker"}, + state=ConfigEntryState.NOT_LOADED, + ) + entry.add_to_hass(hass) + + assert entry.state == ConfigEntryState.NOT_LOADED + # returns False after timeout + assert not await mqtt.async_wait_for_mqtt_client(hass) + + +async def test_waiting_for_client_with_disabled_entry( + hass: HomeAssistant, +) -> None: + """Test waiting for client with timeout.""" + hass.state = CoreState.starting + await hass.async_block_till_done() + + entry = MockConfigEntry( + domain=mqtt.DOMAIN, + data={"broker": "test-broker"}, + state=ConfigEntryState.NOT_LOADED, + ) + entry.add_to_hass(hass) + + # Disable MQTT config entry + await hass.config_entries.async_set_disabled_by( + entry.entry_id, ConfigEntryDisabler.USER + ) + + assert entry.state == ConfigEntryState.NOT_LOADED + + # returns False because entry is disabled + assert not await mqtt.async_wait_for_mqtt_client(hass) diff --git a/tests/components/mqtt_json/test_device_tracker.py b/tests/components/mqtt_json/test_device_tracker.py index 2cc5299061b..8423ccd8da2 100644 --- a/tests/components/mqtt_json/test_device_tracker.py +++ b/tests/components/mqtt_json/test_device_tracker.py @@ -11,6 +11,8 @@ from homeassistant.components.device_tracker.legacy import ( DOMAIN as DT_DOMAIN, YAML_DEVICES, ) +from homeassistant.components.mqtt import DOMAIN as MQTT_DOMAIN +from homeassistant.config_entries import ConfigEntryDisabler from homeassistant.const import CONF_PLATFORM from homeassistant.core import HomeAssistant from homeassistant.setup import async_setup_component @@ -39,6 +41,28 @@ async def setup_comp( os.remove(yaml_devices) +async def test_setup_fails_without_mqtt_being_setup( + hass: HomeAssistant, caplog: pytest.LogCaptureFixture +) -> None: + """Ensure mqtt is started when we setup the component.""" + # Simulate MQTT is was removed + mqtt_entry = hass.config_entries.async_entries(MQTT_DOMAIN)[0] + await hass.config_entries.async_unload(mqtt_entry.entry_id) + await hass.config_entries.async_set_disabled_by( + mqtt_entry.entry_id, ConfigEntryDisabler.USER + ) + + dev_id = "zanzito" + topic = "location/zanzito" + + await async_setup_component( + hass, + DT_DOMAIN, + {DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: topic}}}, + ) + assert "MQTT integration is not available" in caplog.text + + async def test_ensure_device_tracker_platform_validation(hass: HomeAssistant) -> None: """Test if platform validation was done.""" diff --git a/tests/components/mqtt_room/test_sensor.py b/tests/components/mqtt_room/test_sensor.py index 999bcebd174..1d6b2980ab2 100644 --- a/tests/components/mqtt_room/test_sensor.py +++ b/tests/components/mqtt_room/test_sensor.py @@ -3,6 +3,8 @@ import datetime import json from unittest.mock import patch +import pytest + from homeassistant.components.mqtt import CONF_QOS, CONF_STATE_TOPIC, DEFAULT_QOS import homeassistant.components.sensor as sensor from homeassistant.const import ( @@ -56,6 +58,28 @@ async def assert_distance(hass, distance): assert state.attributes.get("distance") == distance +async def test_no_mqtt(hass: HomeAssistant, caplog: pytest.LogCaptureFixture) -> None: + """Test no mqtt available.""" + assert await async_setup_component( + hass, + sensor.DOMAIN, + { + sensor.DOMAIN: { + CONF_PLATFORM: "mqtt_room", + CONF_NAME: NAME, + CONF_DEVICE_ID: DEVICE_ID, + CONF_STATE_TOPIC: "room_presence", + CONF_QOS: DEFAULT_QOS, + CONF_TIMEOUT: 5, + } + }, + ) + await hass.async_block_till_done() + state = hass.states.get(SENSOR_STATE) + assert state is None + assert "MQTT integration is not available" in caplog.text + + async def test_room_update(hass: HomeAssistant, mqtt_mock: MqttMockHAClient) -> None: """Test the updating between rooms.""" assert await async_setup_component(