mirror of
https://github.com/home-assistant/core.git
synced 2025-07-19 03:07:37 +00:00
Only (re)subscribe MQTT topics using the debouncer (#119995)
* Only (re)subscribe using the debouncer * Update test * Fix test * Reset mock
This commit is contained in:
parent
0053c92d2b
commit
60ba80a28d
@ -1035,7 +1035,8 @@ class MQTT:
|
|||||||
self, birth_message: PublishMessage
|
self, birth_message: PublishMessage
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Resubscribe to all topics and publish birth message."""
|
"""Resubscribe to all topics and publish birth message."""
|
||||||
await self._async_perform_subscriptions()
|
self._async_queue_resubscribe()
|
||||||
|
self._subscribe_debouncer.async_schedule()
|
||||||
await self._ha_started.wait() # Wait for Home Assistant to start
|
await self._ha_started.wait() # Wait for Home Assistant to start
|
||||||
await self._discovery_cooldown() # Wait for MQTT discovery to cool down
|
await self._discovery_cooldown() # Wait for MQTT discovery to cool down
|
||||||
# Update subscribe cooldown period to a shorter time
|
# Update subscribe cooldown period to a shorter time
|
||||||
@ -1091,7 +1092,6 @@ class MQTT:
|
|||||||
result_code,
|
result_code,
|
||||||
)
|
)
|
||||||
|
|
||||||
self._async_queue_resubscribe()
|
|
||||||
birth: dict[str, Any]
|
birth: dict[str, Any]
|
||||||
if birth := self.conf.get(CONF_BIRTH_MESSAGE, DEFAULT_BIRTH):
|
if birth := self.conf.get(CONF_BIRTH_MESSAGE, DEFAULT_BIRTH):
|
||||||
birth_message = PublishMessage(**birth)
|
birth_message = PublishMessage(**birth)
|
||||||
@ -1102,13 +1102,8 @@ class MQTT:
|
|||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Update subscribe cooldown period to a shorter time
|
# Update subscribe cooldown period to a shorter time
|
||||||
self.config_entry.async_create_background_task(
|
self._async_queue_resubscribe()
|
||||||
self.hass,
|
self._subscribe_debouncer.async_schedule()
|
||||||
self._async_perform_subscriptions(),
|
|
||||||
name="mqtt re-subscribe",
|
|
||||||
)
|
|
||||||
self._subscribe_debouncer.set_timeout(SUBSCRIBE_COOLDOWN)
|
|
||||||
_LOGGER.info("MQTT client initialized")
|
|
||||||
|
|
||||||
self._async_connection_result(True)
|
self._async_connection_result(True)
|
||||||
|
|
||||||
|
@ -1583,6 +1583,8 @@ async def test_replaying_payload_same_topic(
|
|||||||
mqtt_client_mock.on_disconnect(None, None, 0)
|
mqtt_client_mock.on_disconnect(None, None, 0)
|
||||||
mqtt_client_mock.on_connect(None, None, None, 0)
|
mqtt_client_mock.on_connect(None, None, None, 0)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
||||||
|
await hass.async_block_till_done()
|
||||||
mqtt_client_mock.subscribe.assert_called()
|
mqtt_client_mock.subscribe.assert_called()
|
||||||
# Simulate a (retained) message played back after reconnecting
|
# Simulate a (retained) message played back after reconnecting
|
||||||
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True)
|
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True)
|
||||||
@ -1797,6 +1799,7 @@ async def test_not_calling_subscribe_when_unsubscribed_within_cooldown(
|
|||||||
assert not mqtt_client_mock.subscribe.called
|
assert not mqtt_client_mock.subscribe.called
|
||||||
|
|
||||||
|
|
||||||
|
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
|
||||||
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
|
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
async def test_unsubscribe_race(
|
async def test_unsubscribe_race(
|
||||||
@ -1808,6 +1811,9 @@ async def test_unsubscribe_race(
|
|||||||
mqtt_mock = await mqtt_mock_entry()
|
mqtt_mock = await mqtt_mock_entry()
|
||||||
# Fake that the client is connected
|
# Fake that the client is connected
|
||||||
mqtt_mock().connected = True
|
mqtt_mock().connected = True
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
calls_a: list[ReceiveMessage] = []
|
calls_a: list[ReceiveMessage] = []
|
||||||
calls_b: list[ReceiveMessage] = []
|
calls_b: list[ReceiveMessage] = []
|
||||||
@ -1868,6 +1874,10 @@ async def test_restore_subscriptions_on_reconnect(
|
|||||||
mqtt_mock = await mqtt_mock_entry()
|
mqtt_mock = await mqtt_mock_entry()
|
||||||
# Fake that the client is connected
|
# Fake that the client is connected
|
||||||
mqtt_mock().connected = True
|
mqtt_mock().connected = True
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
mqtt_client_mock.subscribe.reset_mock()
|
||||||
|
|
||||||
await mqtt.async_subscribe(hass, "test/state", record_calls)
|
await mqtt.async_subscribe(hass, "test/state", record_calls)
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
||||||
@ -1876,8 +1886,10 @@ async def test_restore_subscriptions_on_reconnect(
|
|||||||
|
|
||||||
mqtt_client_mock.on_disconnect(None, None, 0)
|
mqtt_client_mock.on_disconnect(None, None, 0)
|
||||||
mqtt_client_mock.on_connect(None, None, None, 0)
|
mqtt_client_mock.on_connect(None, None, None, 0)
|
||||||
|
await hass.async_block_till_done()
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert mqtt_client_mock.subscribe.call_count == 2
|
assert mqtt_client_mock.subscribe.call_count == 2
|
||||||
|
|
||||||
@ -2586,6 +2598,9 @@ async def test_default_birth_message(
|
|||||||
"mqtt_config_entry_data",
|
"mqtt_config_entry_data",
|
||||||
[{mqtt.CONF_BROKER: "mock-broker", mqtt.CONF_BIRTH_MESSAGE: {}}],
|
[{mqtt.CONF_BROKER: "mock-broker", mqtt.CONF_BIRTH_MESSAGE: {}}],
|
||||||
)
|
)
|
||||||
|
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
|
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
|
||||||
|
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
async def test_no_birth_message(
|
async def test_no_birth_message(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
mqtt_client_mock: MqttMockPahoClient,
|
mqtt_client_mock: MqttMockPahoClient,
|
||||||
@ -2593,23 +2608,26 @@ async def test_no_birth_message(
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Test disabling birth message."""
|
"""Test disabling birth message."""
|
||||||
await mqtt_mock_entry()
|
await mqtt_mock_entry()
|
||||||
with patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.1):
|
await hass.async_block_till_done()
|
||||||
mqtt_client_mock.on_connect(None, None, 0, 0)
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
await asyncio.sleep(0.2)
|
mqtt_client_mock.reset_mock()
|
||||||
mqtt_client_mock.publish.assert_not_called()
|
|
||||||
|
# Assert no birth message was sent
|
||||||
|
mqtt_client_mock.on_connect(None, None, 0, 0)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
mqtt_client_mock.publish.assert_not_called()
|
||||||
|
|
||||||
async def callback(msg: ReceiveMessage) -> None:
|
async def callback(msg: ReceiveMessage) -> None:
|
||||||
"""Handle birth message."""
|
"""Handle birth message."""
|
||||||
|
|
||||||
# Assert the subscribe debouncer subscribes after
|
|
||||||
# about SUBSCRIBE_COOLDOWN (0.1) sec
|
|
||||||
# but sooner than INITIAL_SUBSCRIBE_COOLDOWN (1.0)
|
|
||||||
|
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
await mqtt.async_subscribe(hass, "homeassistant/some-topic", callback)
|
await mqtt.async_subscribe(hass, "homeassistant/some-topic", callback)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
await asyncio.sleep(0.2)
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=5))
|
||||||
|
await hass.async_block_till_done()
|
||||||
mqtt_client_mock.subscribe.assert_called()
|
mqtt_client_mock.subscribe.assert_called()
|
||||||
|
|
||||||
|
|
||||||
@ -2690,15 +2708,16 @@ async def test_delayed_birth_message(
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
|
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
|
||||||
|
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
async def test_subscription_done_when_birth_message_is_sent(
|
async def test_subscription_done_when_birth_message_is_sent(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
mqtt_client_mock: MqttMockPahoClient,
|
mqtt_client_mock: MqttMockPahoClient,
|
||||||
|
record_calls: MessageCallbackType,
|
||||||
mqtt_config_entry_data,
|
mqtt_config_entry_data,
|
||||||
mqtt_mock_entry: MqttMockHAClientGenerator,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test sending birth message until initial subscription has been completed."""
|
"""Test sending birth message until initial subscription has been completed."""
|
||||||
mqtt_mock = await mqtt_mock_entry()
|
|
||||||
|
|
||||||
hass.set_state(CoreState.starting)
|
hass.set_state(CoreState.starting)
|
||||||
birth = asyncio.Event()
|
birth = asyncio.Event()
|
||||||
|
|
||||||
@ -2707,32 +2726,27 @@ async def test_subscription_done_when_birth_message_is_sent(
|
|||||||
entry = MockConfigEntry(domain=mqtt.DOMAIN, data=mqtt_config_entry_data)
|
entry = MockConfigEntry(domain=mqtt.DOMAIN, data=mqtt_config_entry_data)
|
||||||
entry.add_to_hass(hass)
|
entry.add_to_hass(hass)
|
||||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||||
|
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
mqtt_client_mock.on_disconnect(None, None, 0, 0)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
mqtt_component_mock = MagicMock(
|
|
||||||
return_value=hass.data["mqtt"].client,
|
|
||||||
wraps=hass.data["mqtt"].client,
|
|
||||||
)
|
|
||||||
mqtt_component_mock._mqttc = mqtt_client_mock
|
|
||||||
|
|
||||||
hass.data["mqtt"].client = mqtt_component_mock
|
|
||||||
mqtt_mock = hass.data["mqtt"].client
|
|
||||||
mqtt_mock.reset_mock()
|
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def wait_birth(msg: ReceiveMessage) -> None:
|
def wait_birth(msg: ReceiveMessage) -> None:
|
||||||
"""Handle birth message."""
|
"""Handle birth message."""
|
||||||
birth.set()
|
birth.set()
|
||||||
|
|
||||||
|
await mqtt.async_subscribe(hass, "topic/test", record_calls)
|
||||||
|
await mqtt.async_subscribe(hass, "homeassistant/status", wait_birth)
|
||||||
|
await hass.async_block_till_done()
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
with patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0):
|
mqtt_client_mock.on_connect(None, None, 0, 0)
|
||||||
await mqtt.async_subscribe(hass, "homeassistant/status", wait_birth)
|
# We wait until we receive a birth message
|
||||||
mqtt_client_mock.on_connect(None, None, 0, 0)
|
await asyncio.wait_for(birth.wait(), 1)
|
||||||
await hass.async_block_till_done()
|
|
||||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
|
||||||
await mqtt.async_subscribe(hass, "topic/test", record_calls)
|
|
||||||
# We wait until we receive a birth message
|
|
||||||
await asyncio.wait_for(birth.wait(), 1)
|
|
||||||
|
|
||||||
# Assert we already have subscribed at the client
|
# Assert we already have subscribed at the client
|
||||||
# for new config payloads at the time we the birth message is received
|
# for new config payloads at the time we the birth message is received
|
||||||
@ -2810,6 +2824,9 @@ async def test_no_will_message(
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
|
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
|
||||||
|
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
async def test_mqtt_subscribes_topics_on_connect(
|
async def test_mqtt_subscribes_topics_on_connect(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
mqtt_client_mock: MqttMockPahoClient,
|
mqtt_client_mock: MqttMockPahoClient,
|
||||||
@ -2818,6 +2835,10 @@ async def test_mqtt_subscribes_topics_on_connect(
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Test subscription to topic on connect."""
|
"""Test subscription to topic on connect."""
|
||||||
await mqtt_mock_entry()
|
await mqtt_mock_entry()
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
mqtt_client_mock.reset_mock()
|
||||||
|
|
||||||
await mqtt.async_subscribe(hass, "topic/test", record_calls)
|
await mqtt.async_subscribe(hass, "topic/test", record_calls)
|
||||||
await mqtt.async_subscribe(hass, "home/sensor", record_calls, 2)
|
await mqtt.async_subscribe(hass, "home/sensor", record_calls, 2)
|
||||||
@ -2826,6 +2847,8 @@ async def test_mqtt_subscribes_topics_on_connect(
|
|||||||
|
|
||||||
mqtt_client_mock.on_connect(None, None, 0, 0)
|
mqtt_client_mock.on_connect(None, None, 0, 0)
|
||||||
|
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
assert mqtt_client_mock.disconnect.call_count == 0
|
assert mqtt_client_mock.disconnect.call_count == 0
|
||||||
|
Loading…
x
Reference in New Issue
Block a user