diff --git a/tests/components/mqtt/conftest.py b/tests/components/mqtt/conftest.py index 5a1f65667cf..774785bb42a 100644 --- a/tests/components/mqtt/conftest.py +++ b/tests/components/mqtt/conftest.py @@ -10,6 +10,7 @@ from typing_extensions import AsyncGenerator, Generator from homeassistant.components import mqtt from homeassistant.components.mqtt.models import MessageCallbackType, ReceiveMessage +from homeassistant.components.mqtt.util import EnsureJobAfterCooldown from homeassistant.const import EVENT_HOMEASSISTANT_STARTED from homeassistant.core import HomeAssistant, callback @@ -49,6 +50,29 @@ def mock_temp_dir(temp_dir_prefix: str) -> Generator[str]: yield mocked_temp_dir +@pytest.fixture +def mock_debouncer(hass: HomeAssistant) -> Generator[asyncio.Event]: + """Mock EnsureJobAfterCooldown. + + Returns an asyncio.Event that allows to await the debouncer task to be finished. + """ + task_done = asyncio.Event() + + class MockDeboncer(EnsureJobAfterCooldown): + """Mock the MQTT client (un)subscribe debouncer.""" + + async def _async_job(self) -> None: + """Execute after a cooldown period.""" + await super()._async_job() + task_done.set() + + # We mock the import of EnsureJobAfterCooldown in client.py + with patch( + "homeassistant.components.mqtt.client.EnsureJobAfterCooldown", MockDeboncer + ): + yield task_done + + @pytest.fixture async def setup_with_birth_msg_client_mock( hass: HomeAssistant, diff --git a/tests/components/mqtt/test_init.py b/tests/components/mqtt/test_init.py index 2c3ca31bff9..231379601c6 100644 --- a/tests/components/mqtt/test_init.py +++ b/tests/components/mqtt/test_init.py @@ -140,13 +140,13 @@ async def test_mqtt_connects_on_home_assistant_mqtt_setup( async def test_mqtt_does_not_disconnect_on_home_assistant_stop( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test if client is not disconnected on HA stop.""" mqtt_client_mock = setup_with_birth_msg_client_mock hass.bus.fire(EVENT_HOMEASSISTANT_STOP) - await hass.async_block_till_done() - await hass.async_block_till_done() + await mock_debouncer.wait() assert mqtt_client_mock.disconnect.call_count == 0 @@ -1085,6 +1085,7 @@ async def test_subscribe_mqtt_config_entry_disabled( async def test_subscribe_and_resubscribe( hass: HomeAssistant, client_debug_log: None, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, @@ -1095,15 +1096,16 @@ async def test_subscribe_and_resubscribe( patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.4), patch("homeassistant.components.mqtt.client.UNSUBSCRIBE_COOLDOWN", 0.4), ): + mock_debouncer.clear() unsub = await mqtt.async_subscribe(hass, "test-topic", record_calls) # This unsub will be un-done with the following subscribe # unsubscribe should not be called at the broker unsub() unsub = await mqtt.async_subscribe(hass, "test-topic", record_calls) + await mock_debouncer.wait() + mock_debouncer.clear() async_fire_mqtt_message(hass, "test-topic", "test-payload") - await asyncio.sleep(0.1) - await hass.async_block_till_done(wait_background_tasks=True) assert len(recorded_calls) == 1 assert recorded_calls[0].topic == "test-topic" @@ -1111,38 +1113,41 @@ async def test_subscribe_and_resubscribe( # assert unsubscribe was not called mqtt_client_mock.unsubscribe.assert_not_called() + mock_debouncer.clear() unsub() - await asyncio.sleep(0.1) - await hass.async_block_till_done(wait_background_tasks=True) + await mock_debouncer.wait() mqtt_client_mock.unsubscribe.assert_called_once_with(["test-topic"]) async def test_subscribe_topic_non_async( hass: HomeAssistant, + mock_debouncer: asyncio.Event, mqtt_mock_entry: MqttMockHAClientGenerator, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, ) -> None: """Test the subscription of a topic using the non-async function.""" await mqtt_mock_entry() + await mock_debouncer.wait() + mock_debouncer.clear() unsub = await hass.async_add_executor_job( mqtt.subscribe, hass, "test-topic", record_calls ) - await hass.async_block_till_done() + await mock_debouncer.wait() async_fire_mqtt_message(hass, "test-topic", "test-payload") - await hass.async_block_till_done() assert len(recorded_calls) == 1 assert recorded_calls[0].topic == "test-topic" assert recorded_calls[0].payload == "test-payload" + mock_debouncer.clear() await hass.async_add_executor_job(unsub) + await mock_debouncer.wait() async_fire_mqtt_message(hass, "test-topic", "test-payload") - await hass.async_block_till_done() assert len(recorded_calls) == 1 @@ -1417,11 +1422,9 @@ async def test_subscribe_special_characters( assert recorded_calls[0].payload == payload -@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0) -@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0) -@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0) async def test_subscribe_same_topic( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test subscribing to same topic twice and simulate retained messages. @@ -1442,25 +1445,22 @@ async def test_subscribe_same_topic( calls_b.append(msg) mqtt_client_mock.reset_mock() + mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state", _callback_a, qos=0) # Simulate a non retained message after the first subscription async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False) - async_fire_time_changed(hass, utcnow() + timedelta(seconds=1)) - await hass.async_block_till_done() + await mock_debouncer.wait() assert len(calls_a) == 1 mqtt_client_mock.subscribe.assert_called() calls_a = [] mqtt_client_mock.reset_mock() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) await hass.async_block_till_done() + mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state", _callback_b, qos=1) # Simulate an other non retained message after the second subscription async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False) - async_fire_time_changed(hass, utcnow() + timedelta(seconds=1)) - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=1)) - await hass.async_block_till_done() + await mock_debouncer.wait() # Both subscriptions should receive updates assert len(calls_a) == 1 assert len(calls_b) == 1 @@ -1469,6 +1469,7 @@ async def test_subscribe_same_topic( async def test_replaying_payload_same_topic( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test replaying retained messages. @@ -1491,21 +1492,20 @@ async def test_replaying_payload_same_topic( calls_b.append(msg) mqtt_client_mock.reset_mock() + mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state", _callback_a) - await hass.async_block_till_done() + await mock_debouncer.wait() async_fire_mqtt_message( hass, "test/state", "online", qos=0, retain=True ) # Simulate a (retained) message played back - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) - await asyncio.sleep(0) - await hass.async_block_till_done() - assert len(calls_a) == 1 mqtt_client_mock.subscribe.assert_called() calls_a = [] mqtt_client_mock.reset_mock() + mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state", _callback_b) + await mock_debouncer.wait() # Simulate edge case where non retained message was received # after subscription at HA but before the debouncer delay was passed. @@ -1516,12 +1516,6 @@ async def test_replaying_payload_same_topic( # Simulate a (retained) message played back on new subscriptions async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True) - # Make sure the debouncer delay was passed - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) - await asyncio.sleep(0) - await hass.async_block_till_done() - # The current subscription only received the message without retain flag assert len(calls_a) == 1 assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=False) @@ -1542,10 +1536,6 @@ async def test_replaying_payload_same_topic( # After connecting the retain flag will not be set, even if the # payload published was retained, we cannot see that async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False) - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) - await asyncio.sleep(0) - await hass.async_block_till_done() assert len(calls_a) == 1 assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=False) assert len(calls_b) == 1 @@ -1556,18 +1546,13 @@ async def test_replaying_payload_same_topic( calls_b = [] mqtt_client_mock.reset_mock() mqtt_client_mock.on_disconnect(None, None, 0) + + mock_debouncer.clear() mqtt_client_mock.on_connect(None, None, None, 0) - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) - await asyncio.sleep(0) - await hass.async_block_till_done() + await mock_debouncer.wait() mqtt_client_mock.subscribe.assert_called() # Simulate a (retained) message played back after reconnecting async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True) - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) - await asyncio.sleep(0) - await hass.async_block_till_done(wait_background_tasks=True) # Both subscriptions now should replay the retained message assert len(calls_a) == 1 assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True) @@ -1575,11 +1560,9 @@ async def test_replaying_payload_same_topic( assert help_assert_message(calls_b[0], "test/state", "online", qos=0, retain=True) -@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0) -@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0) -@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0) async def test_replaying_payload_after_resubscribing( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test replaying and filtering retained messages after resubscribing. @@ -1597,22 +1580,18 @@ async def test_replaying_payload_after_resubscribing( calls_a.append(msg) mqtt_client_mock.reset_mock() + mock_debouncer.clear() unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a) - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) - await hass.async_block_till_done() - await hass.async_block_till_done() + await mock_debouncer.wait() mqtt_client_mock.subscribe.assert_called() # Simulate a (retained) message played back async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True) - await hass.async_block_till_done() assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True) calls_a.clear() # Test we get updates async_fire_mqtt_message(hass, "test/state", "offline", qos=0, retain=False) - await hass.async_block_till_done() assert help_assert_message(calls_a[0], "test/state", "offline", qos=0, retain=False) calls_a.clear() @@ -1622,24 +1601,20 @@ async def test_replaying_payload_after_resubscribing( assert len(calls_a) == 0 # Unsubscribe an resubscribe again + mock_debouncer.clear() unsub() unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a) - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) - await hass.async_block_till_done() + await mock_debouncer.wait() mqtt_client_mock.subscribe.assert_called() # Simulate we can receive a (retained) played back message again async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True) - await hass.async_block_till_done() assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True) -@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0) -@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0) -@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0) async def test_replaying_payload_wildcard_topic( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test replaying retained messages. @@ -1663,28 +1638,24 @@ async def test_replaying_payload_wildcard_topic( calls_b.append(msg) mqtt_client_mock.reset_mock() + mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/#", _callback_a) + await mock_debouncer.wait() # Simulate (retained) messages being played back on new subscriptions async_fire_mqtt_message(hass, "test/state1", "new_value_1", qos=0, retain=True) async_fire_mqtt_message(hass, "test/state2", "new_value_2", qos=0, retain=True) - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown - await asyncio.sleep(0) - await hass.async_block_till_done() assert len(calls_a) == 2 mqtt_client_mock.subscribe.assert_called() calls_a = [] mqtt_client_mock.reset_mock() # resubscribe to the wild card topic again + mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/#", _callback_b) + await mock_debouncer.wait() # Simulate (retained) messages being played back on new subscriptions async_fire_mqtt_message(hass, "test/state1", "initial_value_1", qos=0, retain=True) async_fire_mqtt_message(hass, "test/state2", "initial_value_2", qos=0, retain=True) - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown - await asyncio.sleep(0) - await hass.async_block_till_done() # The retained messages playback should only be processed for the new subscriptions assert len(calls_a) == 0 assert len(calls_b) == 2 @@ -1697,8 +1668,6 @@ async def test_replaying_payload_wildcard_topic( # Simulate new messages being received async_fire_mqtt_message(hass, "test/state1", "update_value_1", qos=0, retain=False) async_fire_mqtt_message(hass, "test/state2", "update_value_2", qos=0, retain=False) - await hass.async_block_till_done() - await asyncio.sleep(0) assert len(calls_a) == 2 assert len(calls_b) == 2 @@ -1707,20 +1676,16 @@ async def test_replaying_payload_wildcard_topic( calls_b = [] mqtt_client_mock.reset_mock() mqtt_client_mock.on_disconnect(None, None, 0) + + mock_debouncer.clear() mqtt_client_mock.on_connect(None, None, None, 0) - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown - await asyncio.sleep(0) - await hass.async_block_till_done() + await mock_debouncer.wait() + mqtt_client_mock.subscribe.assert_called() # Simulate the (retained) messages are played back after reconnecting # for all subscriptions async_fire_mqtt_message(hass, "test/state1", "update_value_1", qos=0, retain=True) async_fire_mqtt_message(hass, "test/state2", "update_value_2", qos=0, retain=True) - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown - await asyncio.sleep(0) - await hass.async_block_till_done(wait_background_tasks=True) # Both subscriptions should replay assert len(calls_a) == 2 assert len(calls_b) == 2 @@ -1728,29 +1693,32 @@ async def test_replaying_payload_wildcard_topic( async def test_not_calling_unsubscribe_with_active_subscribers( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, ) -> None: """Test not calling unsubscribe() when other subscribers are active.""" mqtt_client_mock = setup_with_birth_msg_client_mock mqtt_client_mock.reset_mock() + mock_debouncer.clear() unsub = await mqtt.async_subscribe(hass, "test/state", record_calls, 2) await mqtt.async_subscribe(hass, "test/state", record_calls, 1) - await hass.async_block_till_done() - await asyncio.sleep(0) - await hass.async_block_till_done() + await mock_debouncer.wait() assert mqtt_client_mock.subscribe.called + mock_debouncer.clear() unsub() await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown await hass.async_block_till_done(wait_background_tasks=True) + async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown assert not mqtt_client_mock.unsubscribe.called + assert not mock_debouncer.is_set() async def test_not_calling_subscribe_when_unsubscribed_within_cooldown( hass: HomeAssistant, - setup_with_birth_msg_client_mock: MqttMockPahoClient, + mock_debouncer: asyncio.Event, + mqtt_mock_entry: MqttMockHAClientGenerator, record_calls: MessageCallbackType, ) -> None: """Test not calling subscribe() when it is unsubscribed. @@ -1758,18 +1726,22 @@ async def test_not_calling_subscribe_when_unsubscribed_within_cooldown( Make sure subscriptions are cleared if unsubscribed before the subscribe cool down period has ended. """ - mqtt_client_mock = setup_with_birth_msg_client_mock - mqtt_client_mock.subscribe.reset_mock() + mqtt_mock = await mqtt_mock_entry() + mqtt_client_mock = mqtt_mock._mqttc + await mock_debouncer.wait() + mock_debouncer.clear() + mqtt_client_mock.subscribe.reset_mock() unsub = await mqtt.async_subscribe(hass, "test/state", record_calls) unsub() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown - await hass.async_block_till_done() + await mock_debouncer.wait() + # The debouncer executes without an pending subscribes assert not mqtt_client_mock.subscribe.called async def test_unsubscribe_race( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test not calling unsubscribe() when other subscribers are active.""" @@ -1786,15 +1758,14 @@ async def test_unsubscribe_race( calls_b.append(msg) mqtt_client_mock.reset_mock() + + mock_debouncer.clear() unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a) unsub() await mqtt.async_subscribe(hass, "test/state", _callback_b) - await asyncio.sleep(0) - await hass.async_block_till_done() + await mock_debouncer.wait() async_fire_mqtt_message(hass, "test/state", "online") - await asyncio.sleep(0) - await hass.async_block_till_done() assert not calls_a assert calls_b @@ -1825,6 +1796,7 @@ async def test_unsubscribe_race( ) async def test_restore_subscriptions_on_reconnect( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, ) -> None: @@ -1833,18 +1805,18 @@ async def test_restore_subscriptions_on_reconnect( mqtt_client_mock.reset_mock() + mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state", record_calls) async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown - await asyncio.sleep(0) - await hass.async_block_till_done() + await mock_debouncer.wait() assert ("test/state", 0) in help_all_subscribe_calls(mqtt_client_mock) mqtt_client_mock.reset_mock() mqtt_client_mock.on_disconnect(None, None, 0) + + mock_debouncer.clear() mqtt_client_mock.on_connect(None, None, None, 0) - await hass.async_block_till_done() - await asyncio.sleep(0.1) - await hass.async_block_till_done(wait_background_tasks=True) + await mock_debouncer.wait() assert ("test/state", 0) in help_all_subscribe_calls(mqtt_client_mock) @@ -1854,20 +1826,19 @@ async def test_restore_subscriptions_on_reconnect( ) async def test_restore_all_active_subscriptions_on_reconnect( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, - freezer: FrozenDateTimeFactory, ) -> None: """Test active subscriptions are restored correctly on reconnect.""" mqtt_client_mock = setup_with_birth_msg_client_mock mqtt_client_mock.reset_mock() + mock_debouncer.clear() unsub = await mqtt.async_subscribe(hass, "test/state", record_calls, qos=2) await mqtt.async_subscribe(hass, "test/state", record_calls, qos=1) await mqtt.async_subscribe(hass, "test/state", record_calls, qos=0) - await hass.async_block_till_done() - freezer.tick(3) - async_fire_time_changed(hass) # cooldown - await hass.async_block_till_done() + # cooldown + await mock_debouncer.wait() # the subscription with the highest QoS should survive expected = [ @@ -1876,68 +1847,54 @@ async def test_restore_all_active_subscriptions_on_reconnect( assert mqtt_client_mock.subscribe.mock_calls == expected unsub() - await hass.async_block_till_done() assert mqtt_client_mock.unsubscribe.call_count == 0 mqtt_client_mock.on_disconnect(None, None, 0) - await hass.async_block_till_done() + + mock_debouncer.clear() mqtt_client_mock.on_connect(None, None, None, 0) - freezer.tick(3) - async_fire_time_changed(hass) # cooldown - await hass.async_block_till_done() + # wait for cooldown + await mock_debouncer.wait() expected.append(call([("test/state", 1)])) for expected_call in expected: assert mqtt_client_mock.subscribe.hass_call(expected_call) - freezer.tick(3) - async_fire_time_changed(hass) # cooldown - await hass.async_block_till_done() - freezer.tick(3) - async_fire_time_changed(hass) # cooldown - await hass.async_block_till_done(wait_background_tasks=True) - @pytest.mark.parametrize( "mqtt_config_entry_data", [{mqtt.CONF_BROKER: "mock-broker", mqtt.CONF_DISCOVERY: False}], ) -@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 1.0) -@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0) -@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 1.0) async def test_subscribed_at_highest_qos( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, - freezer: FrozenDateTimeFactory, ) -> None: """Test the highest qos as assigned when subscribing to the same topic.""" mqtt_client_mock = setup_with_birth_msg_client_mock mqtt_client_mock.reset_mock() + mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state", record_calls, qos=0) await hass.async_block_till_done() - freezer.tick(5) - async_fire_time_changed(hass) # cooldown - await hass.async_block_till_done() + # cooldown + await mock_debouncer.wait() assert ("test/state", 0) in help_all_subscribe_calls(mqtt_client_mock) mqtt_client_mock.reset_mock() - freezer.tick(5) - async_fire_time_changed(hass) # cooldown - await hass.async_block_till_done() - await hass.async_block_till_done() + mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state", record_calls, qos=1) await mqtt.async_subscribe(hass, "test/state", record_calls, qos=2) - await hass.async_block_till_done() - freezer.tick(5) - async_fire_time_changed(hass) # cooldown - await hass.async_block_till_done() + # cooldown + await mock_debouncer.wait() + # the subscription with the highest QoS should survive assert help_all_subscribe_calls(mqtt_client_mock) == [("test/state", 2)] async def test_reload_entry_with_restored_subscriptions( hass: HomeAssistant, + mock_debouncer: asyncio.Event, mqtt_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, recorded_calls: list[ReceiveMessage], @@ -1950,13 +1907,15 @@ async def test_reload_entry_with_restored_subscriptions( with patch("homeassistant.config.load_yaml_config_file", return_value={}): await hass.config_entries.async_setup(entry.entry_id) + mock_debouncer.clear() await mqtt.async_subscribe(hass, "test-topic", record_calls) await mqtt.async_subscribe(hass, "wild/+/card", record_calls) + # cooldown + await mock_debouncer.wait() async_fire_mqtt_message(hass, "test-topic", "test-payload") async_fire_mqtt_message(hass, "wild/any/card", "wild-card-payload") - await hass.async_block_till_done() assert len(recorded_calls) == 2 assert recorded_calls[0].topic == "test-topic" assert recorded_calls[0].payload == "test-payload" @@ -1967,13 +1926,14 @@ async def test_reload_entry_with_restored_subscriptions( # Reload the entry with patch("homeassistant.config.load_yaml_config_file", return_value={}): assert await hass.config_entries.async_reload(entry.entry_id) + mock_debouncer.clear() assert entry.state is ConfigEntryState.LOADED - await hass.async_block_till_done() + # cooldown + await mock_debouncer.wait() async_fire_mqtt_message(hass, "test-topic", "test-payload2") async_fire_mqtt_message(hass, "wild/any/card", "wild-card-payload2") - await hass.async_block_till_done() assert len(recorded_calls) == 2 assert recorded_calls[0].topic == "test-topic" assert recorded_calls[0].payload == "test-payload2" @@ -1984,13 +1944,14 @@ async def test_reload_entry_with_restored_subscriptions( # Reload the entry again with patch("homeassistant.config.load_yaml_config_file", return_value={}): assert await hass.config_entries.async_reload(entry.entry_id) + mock_debouncer.clear() assert entry.state is ConfigEntryState.LOADED - await hass.async_block_till_done() + # cooldown + await mock_debouncer.wait() async_fire_mqtt_message(hass, "test-topic", "test-payload3") async_fire_mqtt_message(hass, "wild/any/card", "wild-card-payload3") - await hass.async_block_till_done() assert len(recorded_calls) == 2 assert recorded_calls[0].topic == "test-topic" assert recorded_calls[0].payload == "test-payload3" @@ -2079,9 +2040,9 @@ async def test_handle_mqtt_on_callback_after_timeout( """Test receiving an ACK after a timeout.""" mqtt_mock = await mqtt_mock_entry() # Simulate the mid future getting a timeout - mqtt_mock()._async_get_mid_future(100).set_exception(asyncio.TimeoutError) - # Simulate an ACK for mid == 100, being received after the timeout - mqtt_client_mock.on_publish(mqtt_client_mock, None, 100) + mqtt_mock()._async_get_mid_future(101).set_exception(asyncio.TimeoutError) + # Simulate an ACK for mid == 101, being received after the timeout + mqtt_client_mock.on_publish(mqtt_client_mock, None, 101) await hass.async_block_till_done() assert "No ACK from MQTT server" not in caplog.text assert "InvalidStateError" not in caplog.text @@ -2119,20 +2080,18 @@ async def test_subscribe_error( mqtt_client_mock.reset_mock() # simulate client is not connected error before subscribing mqtt_client_mock.subscribe.side_effect = lambda *args: (4, None) - with patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0): - await mqtt.async_subscribe(hass, "some-topic", record_calls) - while mqtt_client_mock.subscribe.call_count == 0: - await hass.async_block_till_done() + await mqtt.async_subscribe(hass, "some-topic", record_calls) + while mqtt_client_mock.subscribe.call_count == 0: await hass.async_block_till_done() - await hass.async_block_till_done() - assert ( - "Error talking to MQTT: The client is not currently connected." - in caplog.text - ) + await hass.async_block_till_done() + assert ( + "Error talking to MQTT: The client is not currently connected." in caplog.text + ) async def test_handle_message_callback( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test for handling an incoming message callback.""" @@ -2146,12 +2105,12 @@ async def test_handle_message_callback( msg = ReceiveMessage( "some-topic", b"test-payload", 1, False, "some-topic", datetime.now() ) + mock_debouncer.clear() await mqtt.async_subscribe(hass, "some-topic", _callback) + await mock_debouncer.wait() mqtt_client_mock.reset_mock() mqtt_client_mock.on_message(None, None, msg) - await hass.async_block_till_done() - await hass.async_block_till_done() assert len(callbacks) == 1 assert callbacks[0].topic == "some-topic" assert callbacks[0].qos == 1 @@ -2239,7 +2198,7 @@ async def test_setup_mqtt_client_protocol( @patch("homeassistant.components.mqtt.client.TIMEOUT_ACK", 0.2) async def test_handle_mqtt_timeout_on_callback( - hass: HomeAssistant, caplog: pytest.LogCaptureFixture + hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_debouncer: asyncio.Event ) -> None: """Test publish without receiving an ACK callback.""" mid = 0 @@ -2247,7 +2206,7 @@ async def test_handle_mqtt_timeout_on_callback( class FakeInfo: """Returns a simulated client publish response.""" - mid = 100 + mid = 102 rc = 0 with patch( @@ -2264,7 +2223,9 @@ async def test_handle_mqtt_timeout_on_callback( # We want to simulate the publish behaviour MQTT client mock_client = mock_client.return_value mock_client.publish.return_value = FakeInfo() + # Mock we get a mid and rc=0 mock_client.subscribe.side_effect = _mock_ack + mock_client.unsubscribe.side_effect = _mock_ack mock_client.connect = MagicMock( return_value=0, side_effect=lambda *args, **kwargs: hass.loop.call_soon_threadsafe( @@ -2278,6 +2239,7 @@ async def test_handle_mqtt_timeout_on_callback( entry.add_to_hass(hass) # Set up the integration + mock_debouncer.clear() assert await hass.config_entries.async_setup(entry.entry_id) # Now call we publish without simulating and ACK callback @@ -2286,6 +2248,12 @@ async def test_handle_mqtt_timeout_on_callback( # There is no ACK so we should see a timeout in the log after publishing assert len(mock_client.publish.mock_calls) == 1 assert "No ACK from MQTT server" in caplog.text + # Ensure we stop lingering background tasks + await hass.config_entries.async_unload(entry.entry_id) + # Assert we did not have any completed subscribes, + # because the debouncer subscribe job failed to receive an ACK, + # and the time auto caused the debouncer job to fail. + assert not mock_debouncer.is_set() async def test_setup_raises_config_entry_not_ready_if_no_connect_broker( @@ -2391,26 +2359,22 @@ async def test_tls_version( @patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0) async def test_custom_birth_message( hass: HomeAssistant, + mock_debouncer: asyncio.Event, mqtt_config_entry_data: dict[str, Any], mqtt_client_mock: MqttMockPahoClient, ) -> None: """Test sending birth message.""" - birth = asyncio.Event() entry = MockConfigEntry(domain=mqtt.DOMAIN, data=mqtt_config_entry_data) entry.add_to_hass(hass) hass.config.components.add(mqtt.DOMAIN) assert await hass.config_entries.async_setup(entry.entry_id) + mock_debouncer.clear() hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) - - @callback - def wait_birth(msg: ReceiveMessage) -> None: - """Handle birth message.""" - birth.set() - - await mqtt.async_subscribe(hass, "birth", wait_birth) - await hass.async_block_till_done() - await birth.wait() + # discovery cooldown + await mock_debouncer.wait() + # Wait for publish call to finish + await hass.async_block_till_done(wait_background_tasks=True) mqtt_client_mock.publish.assert_called_with("birth", "birth", 0, False) @@ -2439,6 +2403,8 @@ async def test_default_birth_message( @patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0) async def test_no_birth_message( hass: HomeAssistant, + record_calls: MessageCallbackType, + mock_debouncer: asyncio.Event, mqtt_config_entry_data: dict[str, Any], mqtt_client_mock: MqttMockPahoClient, ) -> None: @@ -2446,26 +2412,19 @@ async def test_no_birth_message( entry = MockConfigEntry(domain=mqtt.DOMAIN, data=mqtt_config_entry_data) entry.add_to_hass(hass) hass.config.components.add(mqtt.DOMAIN) + mock_debouncer.clear() assert await hass.config_entries.async_setup(entry.entry_id) - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) - await hass.async_block_till_done() - mqtt_client_mock.reset_mock() - - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) - await hass.async_block_till_done() + # Wait for discovery cooldown + await mock_debouncer.wait() + # Ensure any publishing could have been processed + await hass.async_block_till_done(wait_background_tasks=True) mqtt_client_mock.publish.assert_not_called() - @callback - def msg_callback(msg: ReceiveMessage) -> None: - """Handle callback.""" - mqtt_client_mock.reset_mock() - await mqtt.async_subscribe(hass, "homeassistant/some-topic", msg_callback) - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=5)) - await hass.async_block_till_done() + mock_debouncer.clear() + await mqtt.async_subscribe(hass, "homeassistant/some-topic", record_calls) + # Wait for discovery cooldown + await mock_debouncer.wait() mqtt_client_mock.subscribe.assert_called() @@ -2487,7 +2446,6 @@ async def test_delayed_birth_message( entry.add_to_hass(hass) hass.config.components.add(mqtt.DOMAIN) assert await hass.config_entries.async_setup(entry.entry_id) - await hass.async_block_till_done() @callback def wait_birth(msg: ReceiveMessage) -> None: @@ -2495,7 +2453,6 @@ async def test_delayed_birth_message( birth.set() await mqtt.async_subscribe(hass, "homeassistant/status", wait_birth) - await hass.async_block_till_done() with pytest.raises(TimeoutError): await asyncio.wait_for(birth.wait(), 0.05) assert not mqtt_client_mock.publish.called @@ -2595,26 +2552,27 @@ async def test_no_will_message( ) async def test_mqtt_subscribes_topics_on_connect( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, ) -> None: """Test subscription to topic on connect.""" mqtt_client_mock = setup_with_birth_msg_client_mock + mock_debouncer.clear() await mqtt.async_subscribe(hass, "topic/test", record_calls) await mqtt.async_subscribe(hass, "home/sensor", record_calls, 2) await mqtt.async_subscribe(hass, "still/pending", record_calls) await mqtt.async_subscribe(hass, "still/pending", record_calls, 1) + await mock_debouncer.wait() mqtt_client_mock.on_disconnect(Mock(), None, 0) mqtt_client_mock.reset_mock() + mock_debouncer.clear() mqtt_client_mock.on_connect(Mock(), None, 0, 0) - - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) - await hass.async_block_till_done(wait_background_tasks=True) + await mock_debouncer.wait() subscribe_calls = help_all_subscribe_calls(mqtt_client_mock) assert ("topic/test", 0) in subscribe_calls @@ -2628,17 +2586,18 @@ async def test_mqtt_subscribes_topics_on_connect( ) async def test_mqtt_subscribes_in_single_call( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, ) -> None: """Test bundled client subscription to topic.""" mqtt_client_mock = setup_with_birth_msg_client_mock mqtt_client_mock.subscribe.reset_mock() + mock_debouncer.clear() await mqtt.async_subscribe(hass, "topic/test", record_calls) await mqtt.async_subscribe(hass, "home/sensor", record_calls) # Make sure the debouncer finishes - await asyncio.sleep(0) - await hass.async_block_till_done(wait_background_tasks=True) + await mock_debouncer.wait() assert mqtt_client_mock.subscribe.call_count == 1 # Assert we have a single subscription call with both subscriptions @@ -2653,6 +2612,7 @@ async def test_mqtt_subscribes_in_single_call( @patch("homeassistant.components.mqtt.client.MAX_UNSUBSCRIBES_PER_CALL", 2) async def test_mqtt_subscribes_and_unsubscribes_in_chunks( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, record_calls: MessageCallbackType, ) -> None: @@ -2661,13 +2621,13 @@ async def test_mqtt_subscribes_and_unsubscribes_in_chunks( mqtt_client_mock.subscribe.reset_mock() unsub_tasks: list[CALLBACK_TYPE] = [] + mock_debouncer.clear() unsub_tasks.append(await mqtt.async_subscribe(hass, "topic/test1", record_calls)) unsub_tasks.append(await mqtt.async_subscribe(hass, "home/sensor1", record_calls)) unsub_tasks.append(await mqtt.async_subscribe(hass, "topic/test2", record_calls)) unsub_tasks.append(await mqtt.async_subscribe(hass, "home/sensor2", record_calls)) # Make sure the debouncer finishes - await asyncio.sleep(0.1) - await hass.async_block_till_done(wait_background_tasks=True) + await mock_debouncer.wait() assert mqtt_client_mock.subscribe.call_count == 2 # Assert we have a 2 subscription calls with both 2 subscriptions @@ -2675,12 +2635,11 @@ async def test_mqtt_subscribes_and_unsubscribes_in_chunks( assert len(mqtt_client_mock.subscribe.mock_calls[1][1][0]) == 2 # Unsubscribe all topics + mock_debouncer.clear() for task in unsub_tasks: task() - await hass.async_block_till_done() # Make sure the debouncer finishes - await asyncio.sleep(0.1) - await hass.async_block_till_done(wait_background_tasks=True) + await mock_debouncer.wait() assert mqtt_client_mock.unsubscribe.call_count == 2 # Assert we have a 2 unsubscribe calls with both 2 topic @@ -2748,6 +2707,7 @@ async def test_message_callback_exception_gets_logged( async def test_message_partial_callback_exception_gets_logged( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test exception raised by message handler.""" @@ -2765,15 +2725,13 @@ async def test_message_partial_callback_exception_gets_logged( """Partial callback handler.""" msg_callback(msg) + mock_debouncer.clear() await mqtt.async_subscribe( hass, "test-topic", partial(parial_handler, bad_handler, {"some_attr"}) ) - await hass.async_block_till_done(wait_background_tasks=True) + await mock_debouncer.wait() async_fire_mqtt_message(hass, "test-topic", "test") await hass.async_block_till_done() - await hass.async_block_till_done() - await asyncio.sleep(0) - await hass.async_block_till_done(wait_background_tasks=True) assert ( "Exception in bad_handler when handling msg on 'test-topic':" @@ -3500,6 +3458,7 @@ async def test_publish_json_from_template( async def test_subscribe_connection_status( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test connextion status subscription.""" @@ -3533,8 +3492,9 @@ async def test_subscribe_connection_status( await hass.async_block_till_done() # Mock connect status + mock_debouncer.clear() mqtt_client_mock.on_connect(None, None, 0, 0) - await hass.async_block_till_done() + await mock_debouncer.wait() assert mqtt.is_connected(hass) is True # Mock disconnect status @@ -3547,9 +3507,9 @@ async def test_subscribe_connection_status( unsub_async() # Mock connect status + mock_debouncer.clear() mqtt_client_mock.on_connect(None, None, 0, 0) - await asyncio.sleep(0) - await hass.async_block_till_done() + await mock_debouncer.wait() assert mqtt.is_connected(hass) is True # Check calls @@ -3584,7 +3544,7 @@ async def test_unload_config_entry( new_mqtt_config_entry = mqtt_config_entry mqtt_client_mock.publish.assert_any_call("just_in_time", "published", 0, False) assert new_mqtt_config_entry.state is ConfigEntryState.NOT_LOADED - await hass.async_block_till_done() + await hass.async_block_till_done(wait_background_tasks=True) assert not hass.services.has_service(mqtt.DOMAIN, "dump") assert not hass.services.has_service(mqtt.DOMAIN, "publish") assert "No ACK from MQTT server" not in caplog.text @@ -4236,6 +4196,7 @@ async def test_auto_reconnect( async def test_server_sock_connect_and_disconnect( hass: HomeAssistant, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, recorded_calls: list[ReceiveMessage], record_calls: MessageCallbackType, @@ -4257,14 +4218,19 @@ async def test_server_sock_connect_and_disconnect( server.close() # mock the server closing the connection on us + mock_debouncer.clear() unsub = await mqtt.async_subscribe(hass, "test-topic", record_calls) + await mock_debouncer.wait() mqtt_client_mock.loop_misc.return_value = paho_mqtt.MQTT_ERR_CONN_LOST mqtt_client_mock.on_socket_unregister_write(mqtt_client_mock, None, client) mqtt_client_mock.on_socket_close(mqtt_client_mock, None, client) mqtt_client_mock.on_disconnect(mqtt_client_mock, None, client) await hass.async_block_till_done() + mock_debouncer.clear() unsub() + await hass.async_block_till_done() + assert not mock_debouncer.is_set() # Should have failed assert len(recorded_calls) == 0 diff --git a/tests/components/mqtt/test_util.py b/tests/components/mqtt/test_util.py index 955fc88448c..a3802de69da 100644 --- a/tests/components/mqtt/test_util.py +++ b/tests/components/mqtt/test_util.py @@ -26,15 +26,15 @@ from tests.typing import MqttMockHAClient, MqttMockPahoClient async def test_canceling_debouncer_on_shutdown( hass: HomeAssistant, record_calls: MessageCallbackType, + mock_debouncer: asyncio.Event, setup_with_birth_msg_client_mock: MqttMockPahoClient, ) -> None: """Test canceling the debouncer when HA shuts down.""" mqtt_client_mock = setup_with_birth_msg_client_mock - + # Mock we are past initial setup + await mock_debouncer.wait() with patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 2): - await hass.async_block_till_done() - async_fire_time_changed(hass, utcnow() + timedelta(seconds=5)) - await hass.async_block_till_done() + mock_debouncer.clear() await mqtt.async_subscribe(hass, "test/state1", record_calls) async_fire_time_changed(hass, utcnow() + timedelta(seconds=0.1)) # Stop HA so the scheduled debouncer task will be canceled @@ -47,9 +47,10 @@ async def test_canceling_debouncer_on_shutdown( await mqtt.async_subscribe(hass, "test/state4", record_calls) async_fire_time_changed(hass, utcnow() + timedelta(seconds=0.1)) await mqtt.async_subscribe(hass, "test/state5", record_calls) - async_fire_time_changed(hass, utcnow() + timedelta(seconds=0.1)) - await hass.async_block_till_done() - + async_fire_time_changed(hass, utcnow() + timedelta(seconds=5)) + await hass.async_block_till_done(wait_background_tasks=True) + # Assert the debouncer subscribe job was not executed + assert not mock_debouncer.is_set() mqtt_client_mock.subscribe.assert_not_called() # Note thet the broker connection will not be disconnected gracefully