mirror of
https://github.com/home-assistant/core.git
synced 2025-07-14 08:47:10 +00:00
Split mqtt subscribe and unsubscribe calls to smaller chunks (#118035)
This commit is contained in:
parent
01f3a5a97c
commit
fa1ef8b0cf
@ -34,6 +34,7 @@ from homeassistant.helpers.start import async_at_started
|
|||||||
from homeassistant.helpers.typing import ConfigType
|
from homeassistant.helpers.typing import ConfigType
|
||||||
from homeassistant.loader import bind_hass
|
from homeassistant.loader import bind_hass
|
||||||
from homeassistant.util.async_ import create_eager_task
|
from homeassistant.util.async_ import create_eager_task
|
||||||
|
from homeassistant.util.collection import chunked_or_all
|
||||||
from homeassistant.util.logging import catch_log_exception
|
from homeassistant.util.logging import catch_log_exception
|
||||||
|
|
||||||
from .const import (
|
from .const import (
|
||||||
@ -100,6 +101,9 @@ UNSUBSCRIBE_COOLDOWN = 0.1
|
|||||||
TIMEOUT_ACK = 10
|
TIMEOUT_ACK = 10
|
||||||
RECONNECT_INTERVAL_SECONDS = 10
|
RECONNECT_INTERVAL_SECONDS = 10
|
||||||
|
|
||||||
|
MAX_SUBSCRIBES_PER_CALL = 500
|
||||||
|
MAX_UNSUBSCRIBES_PER_CALL = 500
|
||||||
|
|
||||||
type SocketType = socket.socket | ssl.SSLSocket | mqtt.WebsocketWrapper | Any
|
type SocketType = socket.socket | ssl.SSLSocket | mqtt.WebsocketWrapper | Any
|
||||||
|
|
||||||
type SubscribePayloadType = str | bytes # Only bytes if encoding is None
|
type SubscribePayloadType = str | bytes # Only bytes if encoding is None
|
||||||
@ -905,11 +909,15 @@ class MQTT:
|
|||||||
self._pending_subscriptions = {}
|
self._pending_subscriptions = {}
|
||||||
|
|
||||||
subscription_list = list(subscriptions.items())
|
subscription_list = list(subscriptions.items())
|
||||||
result, mid = self._mqttc.subscribe(subscription_list)
|
|
||||||
|
for chunk in chunked_or_all(subscription_list, MAX_SUBSCRIBES_PER_CALL):
|
||||||
|
result, mid = self._mqttc.subscribe(chunk)
|
||||||
|
|
||||||
if _LOGGER.isEnabledFor(logging.DEBUG):
|
if _LOGGER.isEnabledFor(logging.DEBUG):
|
||||||
for topic, qos in subscriptions.items():
|
for topic, qos in subscriptions.items():
|
||||||
_LOGGER.debug("Subscribing to %s, mid: %s, qos: %s", topic, mid, qos)
|
_LOGGER.debug(
|
||||||
|
"Subscribing to %s, mid: %s, qos: %s", topic, mid, qos
|
||||||
|
)
|
||||||
self._last_subscribe = time.monotonic()
|
self._last_subscribe = time.monotonic()
|
||||||
|
|
||||||
if result == 0:
|
if result == 0:
|
||||||
@ -925,10 +933,11 @@ class MQTT:
|
|||||||
topics = list(self._pending_unsubscribes)
|
topics = list(self._pending_unsubscribes)
|
||||||
self._pending_unsubscribes = set()
|
self._pending_unsubscribes = set()
|
||||||
|
|
||||||
result, mid = self._mqttc.unsubscribe(topics)
|
for chunk in chunked_or_all(topics, MAX_UNSUBSCRIBES_PER_CALL):
|
||||||
|
result, mid = self._mqttc.unsubscribe(chunk)
|
||||||
_raise_on_error(result)
|
_raise_on_error(result)
|
||||||
if _LOGGER.isEnabledFor(logging.DEBUG):
|
if _LOGGER.isEnabledFor(logging.DEBUG):
|
||||||
for topic in topics:
|
for topic in chunk:
|
||||||
_LOGGER.debug("Unsubscribing from %s, mid: %s", topic, mid)
|
_LOGGER.debug("Unsubscribing from %s, mid: %s", topic, mid)
|
||||||
|
|
||||||
await self._async_wait_for_mid(mid)
|
await self._async_wait_for_mid(mid)
|
||||||
|
@ -44,7 +44,7 @@ from homeassistant.const import (
|
|||||||
UnitOfTemperature,
|
UnitOfTemperature,
|
||||||
)
|
)
|
||||||
import homeassistant.core as ha
|
import homeassistant.core as ha
|
||||||
from homeassistant.core import CoreState, HomeAssistant, callback
|
from homeassistant.core import CALLBACK_TYPE, CoreState, HomeAssistant, callback
|
||||||
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
|
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
|
||||||
from homeassistant.helpers import device_registry as dr, entity_registry as er, template
|
from homeassistant.helpers import device_registry as dr, entity_registry as er, template
|
||||||
from homeassistant.helpers.entity import Entity
|
from homeassistant.helpers.entity import Entity
|
||||||
@ -2816,6 +2816,59 @@ async def test_mqtt_subscribes_in_single_call(
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"mqtt_config_entry_data",
|
||||||
|
[
|
||||||
|
{
|
||||||
|
mqtt.CONF_BROKER: "mock-broker",
|
||||||
|
mqtt.CONF_BIRTH_MESSAGE: {},
|
||||||
|
mqtt.CONF_DISCOVERY: False,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@patch("homeassistant.components.mqtt.client.MAX_SUBSCRIBES_PER_CALL", 2)
|
||||||
|
@patch("homeassistant.components.mqtt.client.MAX_UNSUBSCRIBES_PER_CALL", 2)
|
||||||
|
@patch("homeassistant.components.mqtt.client.SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
|
@patch("homeassistant.components.mqtt.client.INITIAL_SUBSCRIBE_COOLDOWN", 0.0)
|
||||||
|
async def test_mqtt_subscribes_and_unsubscribes_in_chunks(
|
||||||
|
hass: HomeAssistant,
|
||||||
|
mqtt_client_mock: MqttMockPahoClient,
|
||||||
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
||||||
|
record_calls: MessageCallbackType,
|
||||||
|
) -> None:
|
||||||
|
"""Test chunked client subscriptions."""
|
||||||
|
mqtt_mock = await mqtt_mock_entry()
|
||||||
|
# Fake that the client is connected
|
||||||
|
mqtt_mock().connected = True
|
||||||
|
|
||||||
|
mqtt_client_mock.subscribe.reset_mock()
|
||||||
|
unsub_tasks: list[CALLBACK_TYPE] = []
|
||||||
|
unsub_tasks.append(await mqtt.async_subscribe(hass, "topic/test1", record_calls))
|
||||||
|
unsub_tasks.append(await mqtt.async_subscribe(hass, "home/sensor1", record_calls))
|
||||||
|
unsub_tasks.append(await mqtt.async_subscribe(hass, "topic/test2", record_calls))
|
||||||
|
unsub_tasks.append(await mqtt.async_subscribe(hass, "home/sensor2", record_calls))
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
# Make sure the debouncer finishes
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
assert mqtt_client_mock.subscribe.call_count == 2
|
||||||
|
# Assert we have a 2 subscription calls with both 2 subscriptions
|
||||||
|
assert len(mqtt_client_mock.subscribe.mock_calls[0][1][0]) == 2
|
||||||
|
assert len(mqtt_client_mock.subscribe.mock_calls[1][1][0]) == 2
|
||||||
|
|
||||||
|
# Unsubscribe all topics
|
||||||
|
for task in unsub_tasks:
|
||||||
|
task()
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
# Make sure the debouncer finishes
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
assert mqtt_client_mock.unsubscribe.call_count == 2
|
||||||
|
# Assert we have a 2 unsubscribe calls with both 2 topic
|
||||||
|
assert len(mqtt_client_mock.unsubscribe.mock_calls[0][1][0]) == 2
|
||||||
|
assert len(mqtt_client_mock.unsubscribe.mock_calls[1][1][0]) == 2
|
||||||
|
|
||||||
|
|
||||||
async def test_default_entry_setting_are_applied(
|
async def test_default_entry_setting_are_applied(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
device_registry: dr.DeviceRegistry,
|
device_registry: dr.DeviceRegistry,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user