mirror of
https://github.com/home-assistant/core.git
synced 2025-07-16 09:47:13 +00:00
Add fixture to synchronize with debouncer in MQTT tests (#120373)
* Add fixture to synchronze with debouncer in MQTT tests * Migrate more tests to use the debouncer * Migrate more tests * Migrate util tests * Improve mqtt on_callback test using new fixture * Improve test_subscribe_error * Migrate other tests * Import EnsureJobAfterCooldown from `util.py` but patch `client.py`
This commit is contained in:
parent
b816fce976
commit
aa05f73210
@ -10,6 +10,7 @@ from typing_extensions import AsyncGenerator, Generator
|
|||||||
|
|
||||||
from homeassistant.components import mqtt
|
from homeassistant.components import mqtt
|
||||||
from homeassistant.components.mqtt.models import MessageCallbackType, ReceiveMessage
|
from homeassistant.components.mqtt.models import MessageCallbackType, ReceiveMessage
|
||||||
|
from homeassistant.components.mqtt.util import EnsureJobAfterCooldown
|
||||||
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED
|
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED
|
||||||
from homeassistant.core import HomeAssistant, callback
|
from homeassistant.core import HomeAssistant, callback
|
||||||
|
|
||||||
@ -49,6 +50,29 @@ def mock_temp_dir(temp_dir_prefix: str) -> Generator[str]:
|
|||||||
yield mocked_temp_dir
|
yield mocked_temp_dir
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_debouncer(hass: HomeAssistant) -> Generator[asyncio.Event]:
|
||||||
|
"""Mock EnsureJobAfterCooldown.
|
||||||
|
|
||||||
|
Returns an asyncio.Event that allows to await the debouncer task to be finished.
|
||||||
|
"""
|
||||||
|
task_done = asyncio.Event()
|
||||||
|
|
||||||
|
class MockDeboncer(EnsureJobAfterCooldown):
|
||||||
|
"""Mock the MQTT client (un)subscribe debouncer."""
|
||||||
|
|
||||||
|
async def _async_job(self) -> None:
|
||||||
|
"""Execute after a cooldown period."""
|
||||||
|
await super()._async_job()
|
||||||
|
task_done.set()
|
||||||
|
|
||||||
|
# We mock the import of EnsureJobAfterCooldown in client.py
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.mqtt.client.EnsureJobAfterCooldown", MockDeboncer
|
||||||
|
):
|
||||||
|
yield task_done
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
async def setup_with_birth_msg_client_mock(
|
async def setup_with_birth_msg_client_mock(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
@ -140,13 +140,13 @@ async def test_mqtt_connects_on_home_assistant_mqtt_setup(
|
|||||||
|
|
||||||
async def test_mqtt_does_not_disconnect_on_home_assistant_stop(
|
async def test_mqtt_does_not_disconnect_on_home_assistant_stop(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test if client is not disconnected on HA stop."""
|
"""Test if client is not disconnected on HA stop."""
|
||||||
mqtt_client_mock = setup_with_birth_msg_client_mock
|
mqtt_client_mock = setup_with_birth_msg_client_mock
|
||||||
hass.bus.fire(EVENT_HOMEASSISTANT_STOP)
|
hass.bus.fire(EVENT_HOMEASSISTANT_STOP)
|
||||||
await hass.async_block_till_done()
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert mqtt_client_mock.disconnect.call_count == 0
|
assert mqtt_client_mock.disconnect.call_count == 0
|
||||||
|
|
||||||
|
|
||||||
@ -1085,6 +1085,7 @@ async def test_subscribe_mqtt_config_entry_disabled(
|
|||||||
async def test_subscribe_and_resubscribe(
|
async def test_subscribe_and_resubscribe(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
client_debug_log: None,
|
client_debug_log: None,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
recorded_calls: list[ReceiveMessage],
|
recorded_calls: list[ReceiveMessage],
|
||||||
record_calls: MessageCallbackType,
|
record_calls: MessageCallbackType,
|
||||||
@ -1095,15 +1096,16 @@ async def test_subscribe_and_resubscribe(
|
|||||||
patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.4),
|
patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.4),
|
||||||
patch("homeassistant.components.mqtt.client.UNSUBSCRIBE_COOLDOWN", 0.4),
|
patch("homeassistant.components.mqtt.client.UNSUBSCRIBE_COOLDOWN", 0.4),
|
||||||
):
|
):
|
||||||
|
mock_debouncer.clear()
|
||||||
unsub = await mqtt.async_subscribe(hass, "test-topic", record_calls)
|
unsub = await mqtt.async_subscribe(hass, "test-topic", record_calls)
|
||||||
# This unsub will be un-done with the following subscribe
|
# This unsub will be un-done with the following subscribe
|
||||||
# unsubscribe should not be called at the broker
|
# unsubscribe should not be called at the broker
|
||||||
unsub()
|
unsub()
|
||||||
unsub = await mqtt.async_subscribe(hass, "test-topic", record_calls)
|
unsub = await mqtt.async_subscribe(hass, "test-topic", record_calls)
|
||||||
|
await mock_debouncer.wait()
|
||||||
|
mock_debouncer.clear()
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "test-topic", "test-payload")
|
async_fire_mqtt_message(hass, "test-topic", "test-payload")
|
||||||
await asyncio.sleep(0.1)
|
|
||||||
await hass.async_block_till_done(wait_background_tasks=True)
|
|
||||||
|
|
||||||
assert len(recorded_calls) == 1
|
assert len(recorded_calls) == 1
|
||||||
assert recorded_calls[0].topic == "test-topic"
|
assert recorded_calls[0].topic == "test-topic"
|
||||||
@ -1111,38 +1113,41 @@ async def test_subscribe_and_resubscribe(
|
|||||||
# assert unsubscribe was not called
|
# assert unsubscribe was not called
|
||||||
mqtt_client_mock.unsubscribe.assert_not_called()
|
mqtt_client_mock.unsubscribe.assert_not_called()
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
unsub()
|
unsub()
|
||||||
|
|
||||||
await asyncio.sleep(0.1)
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done(wait_background_tasks=True)
|
|
||||||
mqtt_client_mock.unsubscribe.assert_called_once_with(["test-topic"])
|
mqtt_client_mock.unsubscribe.assert_called_once_with(["test-topic"])
|
||||||
|
|
||||||
|
|
||||||
async def test_subscribe_topic_non_async(
|
async def test_subscribe_topic_non_async(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
mqtt_mock_entry: MqttMockHAClientGenerator,
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
||||||
recorded_calls: list[ReceiveMessage],
|
recorded_calls: list[ReceiveMessage],
|
||||||
record_calls: MessageCallbackType,
|
record_calls: MessageCallbackType,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test the subscription of a topic using the non-async function."""
|
"""Test the subscription of a topic using the non-async function."""
|
||||||
await mqtt_mock_entry()
|
await mqtt_mock_entry()
|
||||||
|
await mock_debouncer.wait()
|
||||||
|
mock_debouncer.clear()
|
||||||
unsub = await hass.async_add_executor_job(
|
unsub = await hass.async_add_executor_job(
|
||||||
mqtt.subscribe, hass, "test-topic", record_calls
|
mqtt.subscribe, hass, "test-topic", record_calls
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await mock_debouncer.wait()
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "test-topic", "test-payload")
|
async_fire_mqtt_message(hass, "test-topic", "test-payload")
|
||||||
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert len(recorded_calls) == 1
|
assert len(recorded_calls) == 1
|
||||||
assert recorded_calls[0].topic == "test-topic"
|
assert recorded_calls[0].topic == "test-topic"
|
||||||
assert recorded_calls[0].payload == "test-payload"
|
assert recorded_calls[0].payload == "test-payload"
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
await hass.async_add_executor_job(unsub)
|
await hass.async_add_executor_job(unsub)
|
||||||
|
await mock_debouncer.wait()
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "test-topic", "test-payload")
|
async_fire_mqtt_message(hass, "test-topic", "test-payload")
|
||||||
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert len(recorded_calls) == 1
|
assert len(recorded_calls) == 1
|
||||||
|
|
||||||
|
|
||||||
@ -1417,11 +1422,9 @@ async def test_subscribe_special_characters(
|
|||||||
assert recorded_calls[0].payload == payload
|
assert recorded_calls[0].payload == payload
|
||||||
|
|
||||||
|
|
||||||
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
|
|
||||||
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
|
|
||||||
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
|
||||||
async def test_subscribe_same_topic(
|
async def test_subscribe_same_topic(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test subscribing to same topic twice and simulate retained messages.
|
"""Test subscribing to same topic twice and simulate retained messages.
|
||||||
@ -1442,25 +1445,22 @@ async def test_subscribe_same_topic(
|
|||||||
calls_b.append(msg)
|
calls_b.append(msg)
|
||||||
|
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
|
mock_debouncer.clear()
|
||||||
await mqtt.async_subscribe(hass, "test/state", _callback_a, qos=0)
|
await mqtt.async_subscribe(hass, "test/state", _callback_a, qos=0)
|
||||||
# Simulate a non retained message after the first subscription
|
# Simulate a non retained message after the first subscription
|
||||||
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False)
|
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False)
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=1))
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert len(calls_a) == 1
|
assert len(calls_a) == 1
|
||||||
mqtt_client_mock.subscribe.assert_called()
|
mqtt_client_mock.subscribe.assert_called()
|
||||||
calls_a = []
|
calls_a = []
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
|
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
mock_debouncer.clear()
|
||||||
await mqtt.async_subscribe(hass, "test/state", _callback_b, qos=1)
|
await mqtt.async_subscribe(hass, "test/state", _callback_b, qos=1)
|
||||||
# Simulate an other non retained message after the second subscription
|
# Simulate an other non retained message after the second subscription
|
||||||
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False)
|
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False)
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=1))
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done()
|
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=1))
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
# Both subscriptions should receive updates
|
# Both subscriptions should receive updates
|
||||||
assert len(calls_a) == 1
|
assert len(calls_a) == 1
|
||||||
assert len(calls_b) == 1
|
assert len(calls_b) == 1
|
||||||
@ -1469,6 +1469,7 @@ async def test_subscribe_same_topic(
|
|||||||
|
|
||||||
async def test_replaying_payload_same_topic(
|
async def test_replaying_payload_same_topic(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test replaying retained messages.
|
"""Test replaying retained messages.
|
||||||
@ -1491,21 +1492,20 @@ async def test_replaying_payload_same_topic(
|
|||||||
calls_b.append(msg)
|
calls_b.append(msg)
|
||||||
|
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
|
mock_debouncer.clear()
|
||||||
await mqtt.async_subscribe(hass, "test/state", _callback_a)
|
await mqtt.async_subscribe(hass, "test/state", _callback_a)
|
||||||
await hass.async_block_till_done()
|
await mock_debouncer.wait()
|
||||||
async_fire_mqtt_message(
|
async_fire_mqtt_message(
|
||||||
hass, "test/state", "online", qos=0, retain=True
|
hass, "test/state", "online", qos=0, retain=True
|
||||||
) # Simulate a (retained) message played back
|
) # Simulate a (retained) message played back
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
|
||||||
assert len(calls_a) == 1
|
assert len(calls_a) == 1
|
||||||
mqtt_client_mock.subscribe.assert_called()
|
mqtt_client_mock.subscribe.assert_called()
|
||||||
calls_a = []
|
calls_a = []
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
await mqtt.async_subscribe(hass, "test/state", _callback_b)
|
await mqtt.async_subscribe(hass, "test/state", _callback_b)
|
||||||
|
await mock_debouncer.wait()
|
||||||
|
|
||||||
# Simulate edge case where non retained message was received
|
# Simulate edge case where non retained message was received
|
||||||
# after subscription at HA but before the debouncer delay was passed.
|
# after subscription at HA but before the debouncer delay was passed.
|
||||||
@ -1516,12 +1516,6 @@ async def test_replaying_payload_same_topic(
|
|||||||
# Simulate a (retained) message played back on new subscriptions
|
# Simulate a (retained) message played back on new subscriptions
|
||||||
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True)
|
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True)
|
||||||
|
|
||||||
# Make sure the debouncer delay was passed
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
|
||||||
# The current subscription only received the message without retain flag
|
# The current subscription only received the message without retain flag
|
||||||
assert len(calls_a) == 1
|
assert len(calls_a) == 1
|
||||||
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=False)
|
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=False)
|
||||||
@ -1542,10 +1536,6 @@ async def test_replaying_payload_same_topic(
|
|||||||
# After connecting the retain flag will not be set, even if the
|
# After connecting the retain flag will not be set, even if the
|
||||||
# payload published was retained, we cannot see that
|
# payload published was retained, we cannot see that
|
||||||
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False)
|
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=False)
|
||||||
await hass.async_block_till_done()
|
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert len(calls_a) == 1
|
assert len(calls_a) == 1
|
||||||
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=False)
|
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=False)
|
||||||
assert len(calls_b) == 1
|
assert len(calls_b) == 1
|
||||||
@ -1556,18 +1546,13 @@ async def test_replaying_payload_same_topic(
|
|||||||
calls_b = []
|
calls_b = []
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
mqtt_client_mock.on_disconnect(None, None, 0)
|
mqtt_client_mock.on_disconnect(None, None, 0)
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
mqtt_client_mock.on_connect(None, None, None, 0)
|
mqtt_client_mock.on_connect(None, None, None, 0)
|
||||||
await hass.async_block_till_done()
|
await mock_debouncer.wait()
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
mqtt_client_mock.subscribe.assert_called()
|
mqtt_client_mock.subscribe.assert_called()
|
||||||
# Simulate a (retained) message played back after reconnecting
|
# Simulate a (retained) message played back after reconnecting
|
||||||
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True)
|
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True)
|
||||||
await hass.async_block_till_done()
|
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
await hass.async_block_till_done(wait_background_tasks=True)
|
|
||||||
# Both subscriptions now should replay the retained message
|
# Both subscriptions now should replay the retained message
|
||||||
assert len(calls_a) == 1
|
assert len(calls_a) == 1
|
||||||
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True)
|
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True)
|
||||||
@ -1575,11 +1560,9 @@ async def test_replaying_payload_same_topic(
|
|||||||
assert help_assert_message(calls_b[0], "test/state", "online", qos=0, retain=True)
|
assert help_assert_message(calls_b[0], "test/state", "online", qos=0, retain=True)
|
||||||
|
|
||||||
|
|
||||||
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
|
|
||||||
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
|
|
||||||
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
|
||||||
async def test_replaying_payload_after_resubscribing(
|
async def test_replaying_payload_after_resubscribing(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test replaying and filtering retained messages after resubscribing.
|
"""Test replaying and filtering retained messages after resubscribing.
|
||||||
@ -1597,22 +1580,18 @@ async def test_replaying_payload_after_resubscribing(
|
|||||||
calls_a.append(msg)
|
calls_a.append(msg)
|
||||||
|
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
|
mock_debouncer.clear()
|
||||||
unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a)
|
unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a)
|
||||||
await hass.async_block_till_done()
|
await mock_debouncer.wait()
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
mqtt_client_mock.subscribe.assert_called()
|
mqtt_client_mock.subscribe.assert_called()
|
||||||
|
|
||||||
# Simulate a (retained) message played back
|
# Simulate a (retained) message played back
|
||||||
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True)
|
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True)
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True)
|
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True)
|
||||||
calls_a.clear()
|
calls_a.clear()
|
||||||
|
|
||||||
# Test we get updates
|
# Test we get updates
|
||||||
async_fire_mqtt_message(hass, "test/state", "offline", qos=0, retain=False)
|
async_fire_mqtt_message(hass, "test/state", "offline", qos=0, retain=False)
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert help_assert_message(calls_a[0], "test/state", "offline", qos=0, retain=False)
|
assert help_assert_message(calls_a[0], "test/state", "offline", qos=0, retain=False)
|
||||||
calls_a.clear()
|
calls_a.clear()
|
||||||
|
|
||||||
@ -1622,24 +1601,20 @@ async def test_replaying_payload_after_resubscribing(
|
|||||||
assert len(calls_a) == 0
|
assert len(calls_a) == 0
|
||||||
|
|
||||||
# Unsubscribe an resubscribe again
|
# Unsubscribe an resubscribe again
|
||||||
|
mock_debouncer.clear()
|
||||||
unsub()
|
unsub()
|
||||||
unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a)
|
unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a)
|
||||||
await hass.async_block_till_done()
|
await mock_debouncer.wait()
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
mqtt_client_mock.subscribe.assert_called()
|
mqtt_client_mock.subscribe.assert_called()
|
||||||
|
|
||||||
# Simulate we can receive a (retained) played back message again
|
# Simulate we can receive a (retained) played back message again
|
||||||
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True)
|
async_fire_mqtt_message(hass, "test/state", "online", qos=0, retain=True)
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True)
|
assert help_assert_message(calls_a[0], "test/state", "online", qos=0, retain=True)
|
||||||
|
|
||||||
|
|
||||||
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
|
|
||||||
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
|
|
||||||
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
|
||||||
async def test_replaying_payload_wildcard_topic(
|
async def test_replaying_payload_wildcard_topic(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test replaying retained messages.
|
"""Test replaying retained messages.
|
||||||
@ -1663,28 +1638,24 @@ async def test_replaying_payload_wildcard_topic(
|
|||||||
calls_b.append(msg)
|
calls_b.append(msg)
|
||||||
|
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
|
mock_debouncer.clear()
|
||||||
await mqtt.async_subscribe(hass, "test/#", _callback_a)
|
await mqtt.async_subscribe(hass, "test/#", _callback_a)
|
||||||
|
await mock_debouncer.wait()
|
||||||
# Simulate (retained) messages being played back on new subscriptions
|
# Simulate (retained) messages being played back on new subscriptions
|
||||||
async_fire_mqtt_message(hass, "test/state1", "new_value_1", qos=0, retain=True)
|
async_fire_mqtt_message(hass, "test/state1", "new_value_1", qos=0, retain=True)
|
||||||
async_fire_mqtt_message(hass, "test/state2", "new_value_2", qos=0, retain=True)
|
async_fire_mqtt_message(hass, "test/state2", "new_value_2", qos=0, retain=True)
|
||||||
await hass.async_block_till_done()
|
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert len(calls_a) == 2
|
assert len(calls_a) == 2
|
||||||
mqtt_client_mock.subscribe.assert_called()
|
mqtt_client_mock.subscribe.assert_called()
|
||||||
calls_a = []
|
calls_a = []
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
|
|
||||||
# resubscribe to the wild card topic again
|
# resubscribe to the wild card topic again
|
||||||
|
mock_debouncer.clear()
|
||||||
await mqtt.async_subscribe(hass, "test/#", _callback_b)
|
await mqtt.async_subscribe(hass, "test/#", _callback_b)
|
||||||
|
await mock_debouncer.wait()
|
||||||
# Simulate (retained) messages being played back on new subscriptions
|
# Simulate (retained) messages being played back on new subscriptions
|
||||||
async_fire_mqtt_message(hass, "test/state1", "initial_value_1", qos=0, retain=True)
|
async_fire_mqtt_message(hass, "test/state1", "initial_value_1", qos=0, retain=True)
|
||||||
async_fire_mqtt_message(hass, "test/state2", "initial_value_2", qos=0, retain=True)
|
async_fire_mqtt_message(hass, "test/state2", "initial_value_2", qos=0, retain=True)
|
||||||
await hass.async_block_till_done()
|
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
# The retained messages playback should only be processed for the new subscriptions
|
# The retained messages playback should only be processed for the new subscriptions
|
||||||
assert len(calls_a) == 0
|
assert len(calls_a) == 0
|
||||||
assert len(calls_b) == 2
|
assert len(calls_b) == 2
|
||||||
@ -1697,8 +1668,6 @@ async def test_replaying_payload_wildcard_topic(
|
|||||||
# Simulate new messages being received
|
# Simulate new messages being received
|
||||||
async_fire_mqtt_message(hass, "test/state1", "update_value_1", qos=0, retain=False)
|
async_fire_mqtt_message(hass, "test/state1", "update_value_1", qos=0, retain=False)
|
||||||
async_fire_mqtt_message(hass, "test/state2", "update_value_2", qos=0, retain=False)
|
async_fire_mqtt_message(hass, "test/state2", "update_value_2", qos=0, retain=False)
|
||||||
await hass.async_block_till_done()
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
assert len(calls_a) == 2
|
assert len(calls_a) == 2
|
||||||
assert len(calls_b) == 2
|
assert len(calls_b) == 2
|
||||||
|
|
||||||
@ -1707,20 +1676,16 @@ async def test_replaying_payload_wildcard_topic(
|
|||||||
calls_b = []
|
calls_b = []
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
mqtt_client_mock.on_disconnect(None, None, 0)
|
mqtt_client_mock.on_disconnect(None, None, 0)
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
mqtt_client_mock.on_connect(None, None, None, 0)
|
mqtt_client_mock.on_connect(None, None, None, 0)
|
||||||
await hass.async_block_till_done()
|
await mock_debouncer.wait()
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
mqtt_client_mock.subscribe.assert_called()
|
mqtt_client_mock.subscribe.assert_called()
|
||||||
# Simulate the (retained) messages are played back after reconnecting
|
# Simulate the (retained) messages are played back after reconnecting
|
||||||
# for all subscriptions
|
# for all subscriptions
|
||||||
async_fire_mqtt_message(hass, "test/state1", "update_value_1", qos=0, retain=True)
|
async_fire_mqtt_message(hass, "test/state1", "update_value_1", qos=0, retain=True)
|
||||||
async_fire_mqtt_message(hass, "test/state2", "update_value_2", qos=0, retain=True)
|
async_fire_mqtt_message(hass, "test/state2", "update_value_2", qos=0, retain=True)
|
||||||
await hass.async_block_till_done()
|
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
await hass.async_block_till_done(wait_background_tasks=True)
|
|
||||||
# Both subscriptions should replay
|
# Both subscriptions should replay
|
||||||
assert len(calls_a) == 2
|
assert len(calls_a) == 2
|
||||||
assert len(calls_b) == 2
|
assert len(calls_b) == 2
|
||||||
@ -1728,29 +1693,32 @@ async def test_replaying_payload_wildcard_topic(
|
|||||||
|
|
||||||
async def test_not_calling_unsubscribe_with_active_subscribers(
|
async def test_not_calling_unsubscribe_with_active_subscribers(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
record_calls: MessageCallbackType,
|
record_calls: MessageCallbackType,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test not calling unsubscribe() when other subscribers are active."""
|
"""Test not calling unsubscribe() when other subscribers are active."""
|
||||||
mqtt_client_mock = setup_with_birth_msg_client_mock
|
mqtt_client_mock = setup_with_birth_msg_client_mock
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
|
mock_debouncer.clear()
|
||||||
unsub = await mqtt.async_subscribe(hass, "test/state", record_calls, 2)
|
unsub = await mqtt.async_subscribe(hass, "test/state", record_calls, 2)
|
||||||
await mqtt.async_subscribe(hass, "test/state", record_calls, 1)
|
await mqtt.async_subscribe(hass, "test/state", record_calls, 1)
|
||||||
await hass.async_block_till_done()
|
await mock_debouncer.wait()
|
||||||
await asyncio.sleep(0)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert mqtt_client_mock.subscribe.called
|
assert mqtt_client_mock.subscribe.called
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
unsub()
|
unsub()
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
|
||||||
await hass.async_block_till_done(wait_background_tasks=True)
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
||||||
assert not mqtt_client_mock.unsubscribe.called
|
assert not mqtt_client_mock.unsubscribe.called
|
||||||
|
assert not mock_debouncer.is_set()
|
||||||
|
|
||||||
|
|
||||||
async def test_not_calling_subscribe_when_unsubscribed_within_cooldown(
|
async def test_not_calling_subscribe_when_unsubscribed_within_cooldown(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
mock_debouncer: asyncio.Event,
|
||||||
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
||||||
record_calls: MessageCallbackType,
|
record_calls: MessageCallbackType,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test not calling subscribe() when it is unsubscribed.
|
"""Test not calling subscribe() when it is unsubscribed.
|
||||||
@ -1758,18 +1726,22 @@ async def test_not_calling_subscribe_when_unsubscribed_within_cooldown(
|
|||||||
Make sure subscriptions are cleared if unsubscribed before
|
Make sure subscriptions are cleared if unsubscribed before
|
||||||
the subscribe cool down period has ended.
|
the subscribe cool down period has ended.
|
||||||
"""
|
"""
|
||||||
mqtt_client_mock = setup_with_birth_msg_client_mock
|
mqtt_mock = await mqtt_mock_entry()
|
||||||
mqtt_client_mock.subscribe.reset_mock()
|
mqtt_client_mock = mqtt_mock._mqttc
|
||||||
|
await mock_debouncer.wait()
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
|
mqtt_client_mock.subscribe.reset_mock()
|
||||||
unsub = await mqtt.async_subscribe(hass, "test/state", record_calls)
|
unsub = await mqtt.async_subscribe(hass, "test/state", record_calls)
|
||||||
unsub()
|
unsub()
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done()
|
# The debouncer executes without an pending subscribes
|
||||||
assert not mqtt_client_mock.subscribe.called
|
assert not mqtt_client_mock.subscribe.called
|
||||||
|
|
||||||
|
|
||||||
async def test_unsubscribe_race(
|
async def test_unsubscribe_race(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test not calling unsubscribe() when other subscribers are active."""
|
"""Test not calling unsubscribe() when other subscribers are active."""
|
||||||
@ -1786,15 +1758,14 @@ async def test_unsubscribe_race(
|
|||||||
calls_b.append(msg)
|
calls_b.append(msg)
|
||||||
|
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a)
|
unsub = await mqtt.async_subscribe(hass, "test/state", _callback_a)
|
||||||
unsub()
|
unsub()
|
||||||
await mqtt.async_subscribe(hass, "test/state", _callback_b)
|
await mqtt.async_subscribe(hass, "test/state", _callback_b)
|
||||||
await asyncio.sleep(0)
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "test/state", "online")
|
async_fire_mqtt_message(hass, "test/state", "online")
|
||||||
await asyncio.sleep(0)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert not calls_a
|
assert not calls_a
|
||||||
assert calls_b
|
assert calls_b
|
||||||
|
|
||||||
@ -1825,6 +1796,7 @@ async def test_unsubscribe_race(
|
|||||||
)
|
)
|
||||||
async def test_restore_subscriptions_on_reconnect(
|
async def test_restore_subscriptions_on_reconnect(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
record_calls: MessageCallbackType,
|
record_calls: MessageCallbackType,
|
||||||
) -> None:
|
) -> None:
|
||||||
@ -1833,18 +1805,18 @@ async def test_restore_subscriptions_on_reconnect(
|
|||||||
|
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
await mqtt.async_subscribe(hass, "test/state", record_calls)
|
await mqtt.async_subscribe(hass, "test/state", record_calls)
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown
|
||||||
await asyncio.sleep(0)
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert ("test/state", 0) in help_all_subscribe_calls(mqtt_client_mock)
|
assert ("test/state", 0) in help_all_subscribe_calls(mqtt_client_mock)
|
||||||
|
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
mqtt_client_mock.on_disconnect(None, None, 0)
|
mqtt_client_mock.on_disconnect(None, None, 0)
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
mqtt_client_mock.on_connect(None, None, None, 0)
|
mqtt_client_mock.on_connect(None, None, None, 0)
|
||||||
await hass.async_block_till_done()
|
await mock_debouncer.wait()
|
||||||
await asyncio.sleep(0.1)
|
|
||||||
await hass.async_block_till_done(wait_background_tasks=True)
|
|
||||||
assert ("test/state", 0) in help_all_subscribe_calls(mqtt_client_mock)
|
assert ("test/state", 0) in help_all_subscribe_calls(mqtt_client_mock)
|
||||||
|
|
||||||
|
|
||||||
@ -1854,20 +1826,19 @@ async def test_restore_subscriptions_on_reconnect(
|
|||||||
)
|
)
|
||||||
async def test_restore_all_active_subscriptions_on_reconnect(
|
async def test_restore_all_active_subscriptions_on_reconnect(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
record_calls: MessageCallbackType,
|
record_calls: MessageCallbackType,
|
||||||
freezer: FrozenDateTimeFactory,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test active subscriptions are restored correctly on reconnect."""
|
"""Test active subscriptions are restored correctly on reconnect."""
|
||||||
mqtt_client_mock = setup_with_birth_msg_client_mock
|
mqtt_client_mock = setup_with_birth_msg_client_mock
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
|
mock_debouncer.clear()
|
||||||
unsub = await mqtt.async_subscribe(hass, "test/state", record_calls, qos=2)
|
unsub = await mqtt.async_subscribe(hass, "test/state", record_calls, qos=2)
|
||||||
await mqtt.async_subscribe(hass, "test/state", record_calls, qos=1)
|
await mqtt.async_subscribe(hass, "test/state", record_calls, qos=1)
|
||||||
await mqtt.async_subscribe(hass, "test/state", record_calls, qos=0)
|
await mqtt.async_subscribe(hass, "test/state", record_calls, qos=0)
|
||||||
await hass.async_block_till_done()
|
# cooldown
|
||||||
freezer.tick(3)
|
await mock_debouncer.wait()
|
||||||
async_fire_time_changed(hass) # cooldown
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
|
||||||
# the subscription with the highest QoS should survive
|
# the subscription with the highest QoS should survive
|
||||||
expected = [
|
expected = [
|
||||||
@ -1876,68 +1847,54 @@ async def test_restore_all_active_subscriptions_on_reconnect(
|
|||||||
assert mqtt_client_mock.subscribe.mock_calls == expected
|
assert mqtt_client_mock.subscribe.mock_calls == expected
|
||||||
|
|
||||||
unsub()
|
unsub()
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert mqtt_client_mock.unsubscribe.call_count == 0
|
assert mqtt_client_mock.unsubscribe.call_count == 0
|
||||||
|
|
||||||
mqtt_client_mock.on_disconnect(None, None, 0)
|
mqtt_client_mock.on_disconnect(None, None, 0)
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
mock_debouncer.clear()
|
||||||
mqtt_client_mock.on_connect(None, None, None, 0)
|
mqtt_client_mock.on_connect(None, None, None, 0)
|
||||||
freezer.tick(3)
|
# wait for cooldown
|
||||||
async_fire_time_changed(hass) # cooldown
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
|
||||||
expected.append(call([("test/state", 1)]))
|
expected.append(call([("test/state", 1)]))
|
||||||
for expected_call in expected:
|
for expected_call in expected:
|
||||||
assert mqtt_client_mock.subscribe.hass_call(expected_call)
|
assert mqtt_client_mock.subscribe.hass_call(expected_call)
|
||||||
|
|
||||||
freezer.tick(3)
|
|
||||||
async_fire_time_changed(hass) # cooldown
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
freezer.tick(3)
|
|
||||||
async_fire_time_changed(hass) # cooldown
|
|
||||||
await hass.async_block_till_done(wait_background_tasks=True)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"mqtt_config_entry_data",
|
"mqtt_config_entry_data",
|
||||||
[{mqtt.CONF_BROKER: "mock-broker", mqtt.CONF_DISCOVERY: False}],
|
[{mqtt.CONF_BROKER: "mock-broker", mqtt.CONF_DISCOVERY: False}],
|
||||||
)
|
)
|
||||||
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 1.0)
|
|
||||||
@patch("homeassistant.components.mqtt.client.DISCOVERY_COOLDOWN", 0.0)
|
|
||||||
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 1.0)
|
|
||||||
async def test_subscribed_at_highest_qos(
|
async def test_subscribed_at_highest_qos(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
record_calls: MessageCallbackType,
|
record_calls: MessageCallbackType,
|
||||||
freezer: FrozenDateTimeFactory,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test the highest qos as assigned when subscribing to the same topic."""
|
"""Test the highest qos as assigned when subscribing to the same topic."""
|
||||||
mqtt_client_mock = setup_with_birth_msg_client_mock
|
mqtt_client_mock = setup_with_birth_msg_client_mock
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
|
mock_debouncer.clear()
|
||||||
await mqtt.async_subscribe(hass, "test/state", record_calls, qos=0)
|
await mqtt.async_subscribe(hass, "test/state", record_calls, qos=0)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
freezer.tick(5)
|
# cooldown
|
||||||
async_fire_time_changed(hass) # cooldown
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert ("test/state", 0) in help_all_subscribe_calls(mqtt_client_mock)
|
assert ("test/state", 0) in help_all_subscribe_calls(mqtt_client_mock)
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
freezer.tick(5)
|
|
||||||
async_fire_time_changed(hass) # cooldown
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
await mqtt.async_subscribe(hass, "test/state", record_calls, qos=1)
|
await mqtt.async_subscribe(hass, "test/state", record_calls, qos=1)
|
||||||
await mqtt.async_subscribe(hass, "test/state", record_calls, qos=2)
|
await mqtt.async_subscribe(hass, "test/state", record_calls, qos=2)
|
||||||
await hass.async_block_till_done()
|
# cooldown
|
||||||
freezer.tick(5)
|
await mock_debouncer.wait()
|
||||||
async_fire_time_changed(hass) # cooldown
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
# the subscription with the highest QoS should survive
|
# the subscription with the highest QoS should survive
|
||||||
assert help_all_subscribe_calls(mqtt_client_mock) == [("test/state", 2)]
|
assert help_all_subscribe_calls(mqtt_client_mock) == [("test/state", 2)]
|
||||||
|
|
||||||
|
|
||||||
async def test_reload_entry_with_restored_subscriptions(
|
async def test_reload_entry_with_restored_subscriptions(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
mqtt_client_mock: MqttMockPahoClient,
|
mqtt_client_mock: MqttMockPahoClient,
|
||||||
record_calls: MessageCallbackType,
|
record_calls: MessageCallbackType,
|
||||||
recorded_calls: list[ReceiveMessage],
|
recorded_calls: list[ReceiveMessage],
|
||||||
@ -1950,13 +1907,15 @@ async def test_reload_entry_with_restored_subscriptions(
|
|||||||
with patch("homeassistant.config.load_yaml_config_file", return_value={}):
|
with patch("homeassistant.config.load_yaml_config_file", return_value={}):
|
||||||
await hass.config_entries.async_setup(entry.entry_id)
|
await hass.config_entries.async_setup(entry.entry_id)
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
await mqtt.async_subscribe(hass, "test-topic", record_calls)
|
await mqtt.async_subscribe(hass, "test-topic", record_calls)
|
||||||
await mqtt.async_subscribe(hass, "wild/+/card", record_calls)
|
await mqtt.async_subscribe(hass, "wild/+/card", record_calls)
|
||||||
|
# cooldown
|
||||||
|
await mock_debouncer.wait()
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "test-topic", "test-payload")
|
async_fire_mqtt_message(hass, "test-topic", "test-payload")
|
||||||
async_fire_mqtt_message(hass, "wild/any/card", "wild-card-payload")
|
async_fire_mqtt_message(hass, "wild/any/card", "wild-card-payload")
|
||||||
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert len(recorded_calls) == 2
|
assert len(recorded_calls) == 2
|
||||||
assert recorded_calls[0].topic == "test-topic"
|
assert recorded_calls[0].topic == "test-topic"
|
||||||
assert recorded_calls[0].payload == "test-payload"
|
assert recorded_calls[0].payload == "test-payload"
|
||||||
@ -1967,13 +1926,14 @@ async def test_reload_entry_with_restored_subscriptions(
|
|||||||
# Reload the entry
|
# Reload the entry
|
||||||
with patch("homeassistant.config.load_yaml_config_file", return_value={}):
|
with patch("homeassistant.config.load_yaml_config_file", return_value={}):
|
||||||
assert await hass.config_entries.async_reload(entry.entry_id)
|
assert await hass.config_entries.async_reload(entry.entry_id)
|
||||||
|
mock_debouncer.clear()
|
||||||
assert entry.state is ConfigEntryState.LOADED
|
assert entry.state is ConfigEntryState.LOADED
|
||||||
await hass.async_block_till_done()
|
# cooldown
|
||||||
|
await mock_debouncer.wait()
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "test-topic", "test-payload2")
|
async_fire_mqtt_message(hass, "test-topic", "test-payload2")
|
||||||
async_fire_mqtt_message(hass, "wild/any/card", "wild-card-payload2")
|
async_fire_mqtt_message(hass, "wild/any/card", "wild-card-payload2")
|
||||||
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert len(recorded_calls) == 2
|
assert len(recorded_calls) == 2
|
||||||
assert recorded_calls[0].topic == "test-topic"
|
assert recorded_calls[0].topic == "test-topic"
|
||||||
assert recorded_calls[0].payload == "test-payload2"
|
assert recorded_calls[0].payload == "test-payload2"
|
||||||
@ -1984,13 +1944,14 @@ async def test_reload_entry_with_restored_subscriptions(
|
|||||||
# Reload the entry again
|
# Reload the entry again
|
||||||
with patch("homeassistant.config.load_yaml_config_file", return_value={}):
|
with patch("homeassistant.config.load_yaml_config_file", return_value={}):
|
||||||
assert await hass.config_entries.async_reload(entry.entry_id)
|
assert await hass.config_entries.async_reload(entry.entry_id)
|
||||||
|
mock_debouncer.clear()
|
||||||
assert entry.state is ConfigEntryState.LOADED
|
assert entry.state is ConfigEntryState.LOADED
|
||||||
await hass.async_block_till_done()
|
# cooldown
|
||||||
|
await mock_debouncer.wait()
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "test-topic", "test-payload3")
|
async_fire_mqtt_message(hass, "test-topic", "test-payload3")
|
||||||
async_fire_mqtt_message(hass, "wild/any/card", "wild-card-payload3")
|
async_fire_mqtt_message(hass, "wild/any/card", "wild-card-payload3")
|
||||||
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert len(recorded_calls) == 2
|
assert len(recorded_calls) == 2
|
||||||
assert recorded_calls[0].topic == "test-topic"
|
assert recorded_calls[0].topic == "test-topic"
|
||||||
assert recorded_calls[0].payload == "test-payload3"
|
assert recorded_calls[0].payload == "test-payload3"
|
||||||
@ -2079,9 +2040,9 @@ async def test_handle_mqtt_on_callback_after_timeout(
|
|||||||
"""Test receiving an ACK after a timeout."""
|
"""Test receiving an ACK after a timeout."""
|
||||||
mqtt_mock = await mqtt_mock_entry()
|
mqtt_mock = await mqtt_mock_entry()
|
||||||
# Simulate the mid future getting a timeout
|
# Simulate the mid future getting a timeout
|
||||||
mqtt_mock()._async_get_mid_future(100).set_exception(asyncio.TimeoutError)
|
mqtt_mock()._async_get_mid_future(101).set_exception(asyncio.TimeoutError)
|
||||||
# Simulate an ACK for mid == 100, being received after the timeout
|
# Simulate an ACK for mid == 101, being received after the timeout
|
||||||
mqtt_client_mock.on_publish(mqtt_client_mock, None, 100)
|
mqtt_client_mock.on_publish(mqtt_client_mock, None, 101)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert "No ACK from MQTT server" not in caplog.text
|
assert "No ACK from MQTT server" not in caplog.text
|
||||||
assert "InvalidStateError" not in caplog.text
|
assert "InvalidStateError" not in caplog.text
|
||||||
@ -2119,20 +2080,18 @@ async def test_subscribe_error(
|
|||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
# simulate client is not connected error before subscribing
|
# simulate client is not connected error before subscribing
|
||||||
mqtt_client_mock.subscribe.side_effect = lambda *args: (4, None)
|
mqtt_client_mock.subscribe.side_effect = lambda *args: (4, None)
|
||||||
with patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0):
|
await mqtt.async_subscribe(hass, "some-topic", record_calls)
|
||||||
await mqtt.async_subscribe(hass, "some-topic", record_calls)
|
while mqtt_client_mock.subscribe.call_count == 0:
|
||||||
while mqtt_client_mock.subscribe.call_count == 0:
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert (
|
assert (
|
||||||
"Error talking to MQTT: The client is not currently connected."
|
"Error talking to MQTT: The client is not currently connected." in caplog.text
|
||||||
in caplog.text
|
)
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def test_handle_message_callback(
|
async def test_handle_message_callback(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test for handling an incoming message callback."""
|
"""Test for handling an incoming message callback."""
|
||||||
@ -2146,12 +2105,12 @@ async def test_handle_message_callback(
|
|||||||
msg = ReceiveMessage(
|
msg = ReceiveMessage(
|
||||||
"some-topic", b"test-payload", 1, False, "some-topic", datetime.now()
|
"some-topic", b"test-payload", 1, False, "some-topic", datetime.now()
|
||||||
)
|
)
|
||||||
|
mock_debouncer.clear()
|
||||||
await mqtt.async_subscribe(hass, "some-topic", _callback)
|
await mqtt.async_subscribe(hass, "some-topic", _callback)
|
||||||
|
await mock_debouncer.wait()
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
mqtt_client_mock.on_message(None, None, msg)
|
mqtt_client_mock.on_message(None, None, msg)
|
||||||
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert len(callbacks) == 1
|
assert len(callbacks) == 1
|
||||||
assert callbacks[0].topic == "some-topic"
|
assert callbacks[0].topic == "some-topic"
|
||||||
assert callbacks[0].qos == 1
|
assert callbacks[0].qos == 1
|
||||||
@ -2239,7 +2198,7 @@ async def test_setup_mqtt_client_protocol(
|
|||||||
|
|
||||||
@patch("homeassistant.components.mqtt.client.TIMEOUT_ACK", 0.2)
|
@patch("homeassistant.components.mqtt.client.TIMEOUT_ACK", 0.2)
|
||||||
async def test_handle_mqtt_timeout_on_callback(
|
async def test_handle_mqtt_timeout_on_callback(
|
||||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
|
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_debouncer: asyncio.Event
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test publish without receiving an ACK callback."""
|
"""Test publish without receiving an ACK callback."""
|
||||||
mid = 0
|
mid = 0
|
||||||
@ -2247,7 +2206,7 @@ async def test_handle_mqtt_timeout_on_callback(
|
|||||||
class FakeInfo:
|
class FakeInfo:
|
||||||
"""Returns a simulated client publish response."""
|
"""Returns a simulated client publish response."""
|
||||||
|
|
||||||
mid = 100
|
mid = 102
|
||||||
rc = 0
|
rc = 0
|
||||||
|
|
||||||
with patch(
|
with patch(
|
||||||
@ -2264,7 +2223,9 @@ async def test_handle_mqtt_timeout_on_callback(
|
|||||||
# We want to simulate the publish behaviour MQTT client
|
# We want to simulate the publish behaviour MQTT client
|
||||||
mock_client = mock_client.return_value
|
mock_client = mock_client.return_value
|
||||||
mock_client.publish.return_value = FakeInfo()
|
mock_client.publish.return_value = FakeInfo()
|
||||||
|
# Mock we get a mid and rc=0
|
||||||
mock_client.subscribe.side_effect = _mock_ack
|
mock_client.subscribe.side_effect = _mock_ack
|
||||||
|
mock_client.unsubscribe.side_effect = _mock_ack
|
||||||
mock_client.connect = MagicMock(
|
mock_client.connect = MagicMock(
|
||||||
return_value=0,
|
return_value=0,
|
||||||
side_effect=lambda *args, **kwargs: hass.loop.call_soon_threadsafe(
|
side_effect=lambda *args, **kwargs: hass.loop.call_soon_threadsafe(
|
||||||
@ -2278,6 +2239,7 @@ async def test_handle_mqtt_timeout_on_callback(
|
|||||||
entry.add_to_hass(hass)
|
entry.add_to_hass(hass)
|
||||||
|
|
||||||
# Set up the integration
|
# Set up the integration
|
||||||
|
mock_debouncer.clear()
|
||||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||||
|
|
||||||
# Now call we publish without simulating and ACK callback
|
# Now call we publish without simulating and ACK callback
|
||||||
@ -2286,6 +2248,12 @@ async def test_handle_mqtt_timeout_on_callback(
|
|||||||
# There is no ACK so we should see a timeout in the log after publishing
|
# There is no ACK so we should see a timeout in the log after publishing
|
||||||
assert len(mock_client.publish.mock_calls) == 1
|
assert len(mock_client.publish.mock_calls) == 1
|
||||||
assert "No ACK from MQTT server" in caplog.text
|
assert "No ACK from MQTT server" in caplog.text
|
||||||
|
# Ensure we stop lingering background tasks
|
||||||
|
await hass.config_entries.async_unload(entry.entry_id)
|
||||||
|
# Assert we did not have any completed subscribes,
|
||||||
|
# because the debouncer subscribe job failed to receive an ACK,
|
||||||
|
# and the time auto caused the debouncer job to fail.
|
||||||
|
assert not mock_debouncer.is_set()
|
||||||
|
|
||||||
|
|
||||||
async def test_setup_raises_config_entry_not_ready_if_no_connect_broker(
|
async def test_setup_raises_config_entry_not_ready_if_no_connect_broker(
|
||||||
@ -2391,26 +2359,22 @@ async def test_tls_version(
|
|||||||
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
async def test_custom_birth_message(
|
async def test_custom_birth_message(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
mqtt_config_entry_data: dict[str, Any],
|
mqtt_config_entry_data: dict[str, Any],
|
||||||
mqtt_client_mock: MqttMockPahoClient,
|
mqtt_client_mock: MqttMockPahoClient,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test sending birth message."""
|
"""Test sending birth message."""
|
||||||
|
|
||||||
birth = asyncio.Event()
|
|
||||||
entry = MockConfigEntry(domain=mqtt.DOMAIN, data=mqtt_config_entry_data)
|
entry = MockConfigEntry(domain=mqtt.DOMAIN, data=mqtt_config_entry_data)
|
||||||
entry.add_to_hass(hass)
|
entry.add_to_hass(hass)
|
||||||
hass.config.components.add(mqtt.DOMAIN)
|
hass.config.components.add(mqtt.DOMAIN)
|
||||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||||
|
mock_debouncer.clear()
|
||||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
||||||
|
# discovery cooldown
|
||||||
@callback
|
await mock_debouncer.wait()
|
||||||
def wait_birth(msg: ReceiveMessage) -> None:
|
# Wait for publish call to finish
|
||||||
"""Handle birth message."""
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
birth.set()
|
|
||||||
|
|
||||||
await mqtt.async_subscribe(hass, "birth", wait_birth)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
await birth.wait()
|
|
||||||
mqtt_client_mock.publish.assert_called_with("birth", "birth", 0, False)
|
mqtt_client_mock.publish.assert_called_with("birth", "birth", 0, False)
|
||||||
|
|
||||||
|
|
||||||
@ -2439,6 +2403,8 @@ async def test_default_birth_message(
|
|||||||
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
async def test_no_birth_message(
|
async def test_no_birth_message(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
record_calls: MessageCallbackType,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
mqtt_config_entry_data: dict[str, Any],
|
mqtt_config_entry_data: dict[str, Any],
|
||||||
mqtt_client_mock: MqttMockPahoClient,
|
mqtt_client_mock: MqttMockPahoClient,
|
||||||
) -> None:
|
) -> None:
|
||||||
@ -2446,26 +2412,19 @@ async def test_no_birth_message(
|
|||||||
entry = MockConfigEntry(domain=mqtt.DOMAIN, data=mqtt_config_entry_data)
|
entry = MockConfigEntry(domain=mqtt.DOMAIN, data=mqtt_config_entry_data)
|
||||||
entry.add_to_hass(hass)
|
entry.add_to_hass(hass)
|
||||||
hass.config.components.add(mqtt.DOMAIN)
|
hass.config.components.add(mqtt.DOMAIN)
|
||||||
|
mock_debouncer.clear()
|
||||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||||
await hass.async_block_till_done()
|
# Wait for discovery cooldown
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done()
|
# Ensure any publishing could have been processed
|
||||||
mqtt_client_mock.reset_mock()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
mqtt_client_mock.publish.assert_not_called()
|
mqtt_client_mock.publish.assert_not_called()
|
||||||
|
|
||||||
@callback
|
|
||||||
def msg_callback(msg: ReceiveMessage) -> None:
|
|
||||||
"""Handle callback."""
|
|
||||||
|
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
await mqtt.async_subscribe(hass, "homeassistant/some-topic", msg_callback)
|
mock_debouncer.clear()
|
||||||
await hass.async_block_till_done()
|
await mqtt.async_subscribe(hass, "homeassistant/some-topic", record_calls)
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=5))
|
# Wait for discovery cooldown
|
||||||
await hass.async_block_till_done()
|
await mock_debouncer.wait()
|
||||||
mqtt_client_mock.subscribe.assert_called()
|
mqtt_client_mock.subscribe.assert_called()
|
||||||
|
|
||||||
|
|
||||||
@ -2487,7 +2446,6 @@ async def test_delayed_birth_message(
|
|||||||
entry.add_to_hass(hass)
|
entry.add_to_hass(hass)
|
||||||
hass.config.components.add(mqtt.DOMAIN)
|
hass.config.components.add(mqtt.DOMAIN)
|
||||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def wait_birth(msg: ReceiveMessage) -> None:
|
def wait_birth(msg: ReceiveMessage) -> None:
|
||||||
@ -2495,7 +2453,6 @@ async def test_delayed_birth_message(
|
|||||||
birth.set()
|
birth.set()
|
||||||
|
|
||||||
await mqtt.async_subscribe(hass, "homeassistant/status", wait_birth)
|
await mqtt.async_subscribe(hass, "homeassistant/status", wait_birth)
|
||||||
await hass.async_block_till_done()
|
|
||||||
with pytest.raises(TimeoutError):
|
with pytest.raises(TimeoutError):
|
||||||
await asyncio.wait_for(birth.wait(), 0.05)
|
await asyncio.wait_for(birth.wait(), 0.05)
|
||||||
assert not mqtt_client_mock.publish.called
|
assert not mqtt_client_mock.publish.called
|
||||||
@ -2595,26 +2552,27 @@ async def test_no_will_message(
|
|||||||
)
|
)
|
||||||
async def test_mqtt_subscribes_topics_on_connect(
|
async def test_mqtt_subscribes_topics_on_connect(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
record_calls: MessageCallbackType,
|
record_calls: MessageCallbackType,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test subscription to topic on connect."""
|
"""Test subscription to topic on connect."""
|
||||||
mqtt_client_mock = setup_with_birth_msg_client_mock
|
mqtt_client_mock = setup_with_birth_msg_client_mock
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
await mqtt.async_subscribe(hass, "topic/test", record_calls)
|
await mqtt.async_subscribe(hass, "topic/test", record_calls)
|
||||||
await mqtt.async_subscribe(hass, "home/sensor", record_calls, 2)
|
await mqtt.async_subscribe(hass, "home/sensor", record_calls, 2)
|
||||||
await mqtt.async_subscribe(hass, "still/pending", record_calls)
|
await mqtt.async_subscribe(hass, "still/pending", record_calls)
|
||||||
await mqtt.async_subscribe(hass, "still/pending", record_calls, 1)
|
await mqtt.async_subscribe(hass, "still/pending", record_calls, 1)
|
||||||
|
await mock_debouncer.wait()
|
||||||
|
|
||||||
mqtt_client_mock.on_disconnect(Mock(), None, 0)
|
mqtt_client_mock.on_disconnect(Mock(), None, 0)
|
||||||
|
|
||||||
mqtt_client_mock.reset_mock()
|
mqtt_client_mock.reset_mock()
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
mqtt_client_mock.on_connect(Mock(), None, 0, 0)
|
mqtt_client_mock.on_connect(Mock(), None, 0, 0)
|
||||||
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done()
|
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=3))
|
|
||||||
await hass.async_block_till_done(wait_background_tasks=True)
|
|
||||||
|
|
||||||
subscribe_calls = help_all_subscribe_calls(mqtt_client_mock)
|
subscribe_calls = help_all_subscribe_calls(mqtt_client_mock)
|
||||||
assert ("topic/test", 0) in subscribe_calls
|
assert ("topic/test", 0) in subscribe_calls
|
||||||
@ -2628,17 +2586,18 @@ async def test_mqtt_subscribes_topics_on_connect(
|
|||||||
)
|
)
|
||||||
async def test_mqtt_subscribes_in_single_call(
|
async def test_mqtt_subscribes_in_single_call(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
record_calls: MessageCallbackType,
|
record_calls: MessageCallbackType,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test bundled client subscription to topic."""
|
"""Test bundled client subscription to topic."""
|
||||||
mqtt_client_mock = setup_with_birth_msg_client_mock
|
mqtt_client_mock = setup_with_birth_msg_client_mock
|
||||||
mqtt_client_mock.subscribe.reset_mock()
|
mqtt_client_mock.subscribe.reset_mock()
|
||||||
|
mock_debouncer.clear()
|
||||||
await mqtt.async_subscribe(hass, "topic/test", record_calls)
|
await mqtt.async_subscribe(hass, "topic/test", record_calls)
|
||||||
await mqtt.async_subscribe(hass, "home/sensor", record_calls)
|
await mqtt.async_subscribe(hass, "home/sensor", record_calls)
|
||||||
# Make sure the debouncer finishes
|
# Make sure the debouncer finishes
|
||||||
await asyncio.sleep(0)
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done(wait_background_tasks=True)
|
|
||||||
|
|
||||||
assert mqtt_client_mock.subscribe.call_count == 1
|
assert mqtt_client_mock.subscribe.call_count == 1
|
||||||
# Assert we have a single subscription call with both subscriptions
|
# Assert we have a single subscription call with both subscriptions
|
||||||
@ -2653,6 +2612,7 @@ async def test_mqtt_subscribes_in_single_call(
|
|||||||
@patch("homeassistant.components.mqtt.client.MAX_UNSUBSCRIBES_PER_CALL", 2)
|
@patch("homeassistant.components.mqtt.client.MAX_UNSUBSCRIBES_PER_CALL", 2)
|
||||||
async def test_mqtt_subscribes_and_unsubscribes_in_chunks(
|
async def test_mqtt_subscribes_and_unsubscribes_in_chunks(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
record_calls: MessageCallbackType,
|
record_calls: MessageCallbackType,
|
||||||
) -> None:
|
) -> None:
|
||||||
@ -2661,13 +2621,13 @@ async def test_mqtt_subscribes_and_unsubscribes_in_chunks(
|
|||||||
|
|
||||||
mqtt_client_mock.subscribe.reset_mock()
|
mqtt_client_mock.subscribe.reset_mock()
|
||||||
unsub_tasks: list[CALLBACK_TYPE] = []
|
unsub_tasks: list[CALLBACK_TYPE] = []
|
||||||
|
mock_debouncer.clear()
|
||||||
unsub_tasks.append(await mqtt.async_subscribe(hass, "topic/test1", record_calls))
|
unsub_tasks.append(await mqtt.async_subscribe(hass, "topic/test1", record_calls))
|
||||||
unsub_tasks.append(await mqtt.async_subscribe(hass, "home/sensor1", record_calls))
|
unsub_tasks.append(await mqtt.async_subscribe(hass, "home/sensor1", record_calls))
|
||||||
unsub_tasks.append(await mqtt.async_subscribe(hass, "topic/test2", record_calls))
|
unsub_tasks.append(await mqtt.async_subscribe(hass, "topic/test2", record_calls))
|
||||||
unsub_tasks.append(await mqtt.async_subscribe(hass, "home/sensor2", record_calls))
|
unsub_tasks.append(await mqtt.async_subscribe(hass, "home/sensor2", record_calls))
|
||||||
# Make sure the debouncer finishes
|
# Make sure the debouncer finishes
|
||||||
await asyncio.sleep(0.1)
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done(wait_background_tasks=True)
|
|
||||||
|
|
||||||
assert mqtt_client_mock.subscribe.call_count == 2
|
assert mqtt_client_mock.subscribe.call_count == 2
|
||||||
# Assert we have a 2 subscription calls with both 2 subscriptions
|
# Assert we have a 2 subscription calls with both 2 subscriptions
|
||||||
@ -2675,12 +2635,11 @@ async def test_mqtt_subscribes_and_unsubscribes_in_chunks(
|
|||||||
assert len(mqtt_client_mock.subscribe.mock_calls[1][1][0]) == 2
|
assert len(mqtt_client_mock.subscribe.mock_calls[1][1][0]) == 2
|
||||||
|
|
||||||
# Unsubscribe all topics
|
# Unsubscribe all topics
|
||||||
|
mock_debouncer.clear()
|
||||||
for task in unsub_tasks:
|
for task in unsub_tasks:
|
||||||
task()
|
task()
|
||||||
await hass.async_block_till_done()
|
|
||||||
# Make sure the debouncer finishes
|
# Make sure the debouncer finishes
|
||||||
await asyncio.sleep(0.1)
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done(wait_background_tasks=True)
|
|
||||||
|
|
||||||
assert mqtt_client_mock.unsubscribe.call_count == 2
|
assert mqtt_client_mock.unsubscribe.call_count == 2
|
||||||
# Assert we have a 2 unsubscribe calls with both 2 topic
|
# Assert we have a 2 unsubscribe calls with both 2 topic
|
||||||
@ -2748,6 +2707,7 @@ async def test_message_callback_exception_gets_logged(
|
|||||||
async def test_message_partial_callback_exception_gets_logged(
|
async def test_message_partial_callback_exception_gets_logged(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
caplog: pytest.LogCaptureFixture,
|
caplog: pytest.LogCaptureFixture,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test exception raised by message handler."""
|
"""Test exception raised by message handler."""
|
||||||
@ -2765,15 +2725,13 @@ async def test_message_partial_callback_exception_gets_logged(
|
|||||||
"""Partial callback handler."""
|
"""Partial callback handler."""
|
||||||
msg_callback(msg)
|
msg_callback(msg)
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
await mqtt.async_subscribe(
|
await mqtt.async_subscribe(
|
||||||
hass, "test-topic", partial(parial_handler, bad_handler, {"some_attr"})
|
hass, "test-topic", partial(parial_handler, bad_handler, {"some_attr"})
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done(wait_background_tasks=True)
|
await mock_debouncer.wait()
|
||||||
async_fire_mqtt_message(hass, "test-topic", "test")
|
async_fire_mqtt_message(hass, "test-topic", "test")
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
await hass.async_block_till_done()
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
await hass.async_block_till_done(wait_background_tasks=True)
|
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
"Exception in bad_handler when handling msg on 'test-topic':"
|
"Exception in bad_handler when handling msg on 'test-topic':"
|
||||||
@ -3500,6 +3458,7 @@ async def test_publish_json_from_template(
|
|||||||
|
|
||||||
async def test_subscribe_connection_status(
|
async def test_subscribe_connection_status(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test connextion status subscription."""
|
"""Test connextion status subscription."""
|
||||||
@ -3533,8 +3492,9 @@ async def test_subscribe_connection_status(
|
|||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
# Mock connect status
|
# Mock connect status
|
||||||
|
mock_debouncer.clear()
|
||||||
mqtt_client_mock.on_connect(None, None, 0, 0)
|
mqtt_client_mock.on_connect(None, None, 0, 0)
|
||||||
await hass.async_block_till_done()
|
await mock_debouncer.wait()
|
||||||
assert mqtt.is_connected(hass) is True
|
assert mqtt.is_connected(hass) is True
|
||||||
|
|
||||||
# Mock disconnect status
|
# Mock disconnect status
|
||||||
@ -3547,9 +3507,9 @@ async def test_subscribe_connection_status(
|
|||||||
unsub_async()
|
unsub_async()
|
||||||
|
|
||||||
# Mock connect status
|
# Mock connect status
|
||||||
|
mock_debouncer.clear()
|
||||||
mqtt_client_mock.on_connect(None, None, 0, 0)
|
mqtt_client_mock.on_connect(None, None, 0, 0)
|
||||||
await asyncio.sleep(0)
|
await mock_debouncer.wait()
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert mqtt.is_connected(hass) is True
|
assert mqtt.is_connected(hass) is True
|
||||||
|
|
||||||
# Check calls
|
# Check calls
|
||||||
@ -3584,7 +3544,7 @@ async def test_unload_config_entry(
|
|||||||
new_mqtt_config_entry = mqtt_config_entry
|
new_mqtt_config_entry = mqtt_config_entry
|
||||||
mqtt_client_mock.publish.assert_any_call("just_in_time", "published", 0, False)
|
mqtt_client_mock.publish.assert_any_call("just_in_time", "published", 0, False)
|
||||||
assert new_mqtt_config_entry.state is ConfigEntryState.NOT_LOADED
|
assert new_mqtt_config_entry.state is ConfigEntryState.NOT_LOADED
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
assert not hass.services.has_service(mqtt.DOMAIN, "dump")
|
assert not hass.services.has_service(mqtt.DOMAIN, "dump")
|
||||||
assert not hass.services.has_service(mqtt.DOMAIN, "publish")
|
assert not hass.services.has_service(mqtt.DOMAIN, "publish")
|
||||||
assert "No ACK from MQTT server" not in caplog.text
|
assert "No ACK from MQTT server" not in caplog.text
|
||||||
@ -4236,6 +4196,7 @@ async def test_auto_reconnect(
|
|||||||
|
|
||||||
async def test_server_sock_connect_and_disconnect(
|
async def test_server_sock_connect_and_disconnect(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
recorded_calls: list[ReceiveMessage],
|
recorded_calls: list[ReceiveMessage],
|
||||||
record_calls: MessageCallbackType,
|
record_calls: MessageCallbackType,
|
||||||
@ -4257,14 +4218,19 @@ async def test_server_sock_connect_and_disconnect(
|
|||||||
|
|
||||||
server.close() # mock the server closing the connection on us
|
server.close() # mock the server closing the connection on us
|
||||||
|
|
||||||
|
mock_debouncer.clear()
|
||||||
unsub = await mqtt.async_subscribe(hass, "test-topic", record_calls)
|
unsub = await mqtt.async_subscribe(hass, "test-topic", record_calls)
|
||||||
|
await mock_debouncer.wait()
|
||||||
|
|
||||||
mqtt_client_mock.loop_misc.return_value = paho_mqtt.MQTT_ERR_CONN_LOST
|
mqtt_client_mock.loop_misc.return_value = paho_mqtt.MQTT_ERR_CONN_LOST
|
||||||
mqtt_client_mock.on_socket_unregister_write(mqtt_client_mock, None, client)
|
mqtt_client_mock.on_socket_unregister_write(mqtt_client_mock, None, client)
|
||||||
mqtt_client_mock.on_socket_close(mqtt_client_mock, None, client)
|
mqtt_client_mock.on_socket_close(mqtt_client_mock, None, client)
|
||||||
mqtt_client_mock.on_disconnect(mqtt_client_mock, None, client)
|
mqtt_client_mock.on_disconnect(mqtt_client_mock, None, client)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
mock_debouncer.clear()
|
||||||
unsub()
|
unsub()
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
assert not mock_debouncer.is_set()
|
||||||
|
|
||||||
# Should have failed
|
# Should have failed
|
||||||
assert len(recorded_calls) == 0
|
assert len(recorded_calls) == 0
|
||||||
|
@ -26,15 +26,15 @@ from tests.typing import MqttMockHAClient, MqttMockPahoClient
|
|||||||
async def test_canceling_debouncer_on_shutdown(
|
async def test_canceling_debouncer_on_shutdown(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
record_calls: MessageCallbackType,
|
record_calls: MessageCallbackType,
|
||||||
|
mock_debouncer: asyncio.Event,
|
||||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test canceling the debouncer when HA shuts down."""
|
"""Test canceling the debouncer when HA shuts down."""
|
||||||
mqtt_client_mock = setup_with_birth_msg_client_mock
|
mqtt_client_mock = setup_with_birth_msg_client_mock
|
||||||
|
# Mock we are past initial setup
|
||||||
|
await mock_debouncer.wait()
|
||||||
with patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 2):
|
with patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 2):
|
||||||
await hass.async_block_till_done()
|
mock_debouncer.clear()
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=5))
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
await mqtt.async_subscribe(hass, "test/state1", record_calls)
|
await mqtt.async_subscribe(hass, "test/state1", record_calls)
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=0.1))
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=0.1))
|
||||||
# Stop HA so the scheduled debouncer task will be canceled
|
# Stop HA so the scheduled debouncer task will be canceled
|
||||||
@ -47,9 +47,10 @@ async def test_canceling_debouncer_on_shutdown(
|
|||||||
await mqtt.async_subscribe(hass, "test/state4", record_calls)
|
await mqtt.async_subscribe(hass, "test/state4", record_calls)
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=0.1))
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=0.1))
|
||||||
await mqtt.async_subscribe(hass, "test/state5", record_calls)
|
await mqtt.async_subscribe(hass, "test/state5", record_calls)
|
||||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=0.1))
|
async_fire_time_changed(hass, utcnow() + timedelta(seconds=5))
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
# Assert the debouncer subscribe job was not executed
|
||||||
|
assert not mock_debouncer.is_set()
|
||||||
mqtt_client_mock.subscribe.assert_not_called()
|
mqtt_client_mock.subscribe.assert_not_called()
|
||||||
|
|
||||||
# Note thet the broker connection will not be disconnected gracefully
|
# Note thet the broker connection will not be disconnected gracefully
|
||||||
|
Loading…
x
Reference in New Issue
Block a user