mirror of
https://github.com/home-assistant/core.git
synced 2025-07-12 15:57:06 +00:00
Set clean_start=True on connect to MQTT broker (#136026)
* Addresses #135443: Set on connect. * Make clean start implementation compatible with v2 API * Add tests * Do not pass default value for `clean_start` on_connect * Revert "Do not pass default value for `clean_start` on_connect" This reverts commit 75806736cf511a6d6b6496454843de34f05f7758. * Use partial top pass kwargs to mqtt client connect --------- Co-authored-by: Jan Bouwhuis <jbouwh@users.noreply.github.com> Co-authored-by: jbouwh <jan@jbsoft.nl>
This commit is contained in:
parent
6613b46071
commit
f8ffbf0506
@ -298,12 +298,15 @@ class MqttClientSetup:
|
|||||||
from .async_client import AsyncMQTTClient
|
from .async_client import AsyncMQTTClient
|
||||||
|
|
||||||
config = self._config
|
config = self._config
|
||||||
|
clean_session: bool | None = None
|
||||||
if (protocol := config.get(CONF_PROTOCOL, DEFAULT_PROTOCOL)) == PROTOCOL_31:
|
if (protocol := config.get(CONF_PROTOCOL, DEFAULT_PROTOCOL)) == PROTOCOL_31:
|
||||||
proto = mqtt.MQTTv31
|
proto = mqtt.MQTTv31
|
||||||
|
clean_session = True
|
||||||
elif protocol == PROTOCOL_5:
|
elif protocol == PROTOCOL_5:
|
||||||
proto = mqtt.MQTTv5
|
proto = mqtt.MQTTv5
|
||||||
else:
|
else:
|
||||||
proto = mqtt.MQTTv311
|
proto = mqtt.MQTTv311
|
||||||
|
clean_session = True
|
||||||
|
|
||||||
if (client_id := config.get(CONF_CLIENT_ID)) is None:
|
if (client_id := config.get(CONF_CLIENT_ID)) is None:
|
||||||
# PAHO MQTT relies on the MQTT server to generate random client IDs.
|
# PAHO MQTT relies on the MQTT server to generate random client IDs.
|
||||||
@ -313,6 +316,19 @@ class MqttClientSetup:
|
|||||||
self._client = AsyncMQTTClient(
|
self._client = AsyncMQTTClient(
|
||||||
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
|
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
|
||||||
client_id=client_id,
|
client_id=client_id,
|
||||||
|
# See: https://eclipse.dev/paho/files/paho.mqtt.python/html/client.html
|
||||||
|
# clean_session (bool defaults to None)
|
||||||
|
# a boolean that determines the client type.
|
||||||
|
# If True, the broker will remove all information about this client when it
|
||||||
|
# disconnects. If False, the client is a persistent client and subscription
|
||||||
|
# information and queued messages will be retained when the client
|
||||||
|
# disconnects. Note that a client will never discard its own outgoing
|
||||||
|
# messages on disconnect. Calling connect() or reconnect() will cause the
|
||||||
|
# messages to be resent. Use reinitialise() to reset a client to its
|
||||||
|
# original state. The clean_session argument only applies to MQTT versions
|
||||||
|
# v3.1.1 and v3.1. It is not accepted if the MQTT version is v5.0 - use the
|
||||||
|
# clean_start argument on connect() instead.
|
||||||
|
clean_session=clean_session,
|
||||||
protocol=proto,
|
protocol=proto,
|
||||||
transport=transport, # type: ignore[arg-type]
|
transport=transport, # type: ignore[arg-type]
|
||||||
reconnect_on_failure=False,
|
reconnect_on_failure=False,
|
||||||
@ -371,6 +387,7 @@ class MQTT:
|
|||||||
self.loop = hass.loop
|
self.loop = hass.loop
|
||||||
self.config_entry = config_entry
|
self.config_entry = config_entry
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
|
self.is_mqttv5 = conf.get(CONF_PROTOCOL, DEFAULT_PROTOCOL) == PROTOCOL_5
|
||||||
|
|
||||||
self._simple_subscriptions: defaultdict[str, set[Subscription]] = defaultdict(
|
self._simple_subscriptions: defaultdict[str, set[Subscription]] = defaultdict(
|
||||||
set
|
set
|
||||||
@ -652,14 +669,25 @@ class MQTT:
|
|||||||
result: int | None = None
|
result: int | None = None
|
||||||
self._available_future = client_available
|
self._available_future = client_available
|
||||||
self._should_reconnect = True
|
self._should_reconnect = True
|
||||||
|
connect_partial = partial(
|
||||||
|
self._mqttc.connect,
|
||||||
|
host=self.conf[CONF_BROKER],
|
||||||
|
port=self.conf.get(CONF_PORT, DEFAULT_PORT),
|
||||||
|
keepalive=self.conf.get(CONF_KEEPALIVE, DEFAULT_KEEPALIVE),
|
||||||
|
# See:
|
||||||
|
# https://eclipse.dev/paho/files/paho.mqtt.python/html/client.html
|
||||||
|
# `clean_start` (bool) – (MQTT v5.0 only) `True`, `False` or
|
||||||
|
# `MQTT_CLEAN_START_FIRST_ONLY`. Sets the MQTT v5.0 clean_start flag
|
||||||
|
# always, never or on the first successful connect only,
|
||||||
|
# respectively. MQTT session data (such as outstanding messages and
|
||||||
|
# subscriptions) is cleared on successful connect when the
|
||||||
|
# clean_start flag is set. For MQTT v3.1.1, the clean_session
|
||||||
|
# argument of Client should be used for similar result.
|
||||||
|
clean_start=True if self.is_mqttv5 else mqtt.MQTT_CLEAN_START_FIRST_ONLY,
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
async with self._connection_lock, self._async_connect_in_executor():
|
async with self._connection_lock, self._async_connect_in_executor():
|
||||||
result = await self.hass.async_add_executor_job(
|
result = await self.hass.async_add_executor_job(connect_partial)
|
||||||
self._mqttc.connect,
|
|
||||||
self.conf[CONF_BROKER],
|
|
||||||
self.conf.get(CONF_PORT, DEFAULT_PORT),
|
|
||||||
self.conf.get(CONF_KEEPALIVE, DEFAULT_KEEPALIVE),
|
|
||||||
)
|
|
||||||
except (OSError, mqtt.WebsocketConnectionError) as err:
|
except (OSError, mqtt.WebsocketConnectionError) as err:
|
||||||
_LOGGER.error("Failed to connect to MQTT server due to exception: %s", err)
|
_LOGGER.error("Failed to connect to MQTT server due to exception: %s", err)
|
||||||
self._async_connection_result(False)
|
self._async_connection_result(False)
|
||||||
|
@ -1271,7 +1271,7 @@ async def test_publish_error(
|
|||||||
with patch(
|
with patch(
|
||||||
"homeassistant.components.mqtt.async_client.AsyncMQTTClient"
|
"homeassistant.components.mqtt.async_client.AsyncMQTTClient"
|
||||||
) as mock_client:
|
) as mock_client:
|
||||||
mock_client().connect = lambda *args: 1
|
mock_client().connect = lambda **kwargs: 1
|
||||||
mock_client().publish().rc = 1
|
mock_client().publish().rc = 1
|
||||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||||
with pytest.raises(HomeAssistantError):
|
with pytest.raises(HomeAssistantError):
|
||||||
@ -1330,7 +1330,7 @@ async def test_handle_message_callback(
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
("mqtt_config_entry_data", "protocol"),
|
("mqtt_config_entry_data", "protocol", "clean_session"),
|
||||||
[
|
[
|
||||||
(
|
(
|
||||||
{
|
{
|
||||||
@ -1338,6 +1338,7 @@ async def test_handle_message_callback(
|
|||||||
CONF_PROTOCOL: "3.1",
|
CONF_PROTOCOL: "3.1",
|
||||||
},
|
},
|
||||||
3,
|
3,
|
||||||
|
True,
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
{
|
{
|
||||||
@ -1345,6 +1346,7 @@ async def test_handle_message_callback(
|
|||||||
CONF_PROTOCOL: "3.1.1",
|
CONF_PROTOCOL: "3.1.1",
|
||||||
},
|
},
|
||||||
4,
|
4,
|
||||||
|
True,
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
{
|
{
|
||||||
@ -1352,22 +1354,72 @@ async def test_handle_message_callback(
|
|||||||
CONF_PROTOCOL: "5",
|
CONF_PROTOCOL: "5",
|
||||||
},
|
},
|
||||||
5,
|
5,
|
||||||
|
None,
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
|
ids=["v3.1", "v3.1.1", "v5"],
|
||||||
)
|
)
|
||||||
async def test_setup_mqtt_client_protocol(
|
async def test_setup_mqtt_client_clean_session_and_protocol(
|
||||||
mqtt_mock_entry: MqttMockHAClientGenerator, protocol: int
|
hass: HomeAssistant,
|
||||||
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
||||||
|
mqtt_client_mock: MqttMockPahoClient,
|
||||||
|
protocol: int,
|
||||||
|
clean_session: bool | None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test MQTT client protocol setup."""
|
"""Test MQTT client clean_session and protocol setup."""
|
||||||
with patch(
|
with patch(
|
||||||
"homeassistant.components.mqtt.async_client.AsyncMQTTClient"
|
"homeassistant.components.mqtt.async_client.AsyncMQTTClient"
|
||||||
) as mock_client:
|
) as mock_client:
|
||||||
await mqtt_mock_entry()
|
await mqtt_mock_entry()
|
||||||
|
|
||||||
|
# check if clean_session was correctly
|
||||||
|
assert mock_client.call_args[1]["clean_session"] == clean_session
|
||||||
|
|
||||||
# check if protocol setup was correctly
|
# check if protocol setup was correctly
|
||||||
assert mock_client.call_args[1]["protocol"] == protocol
|
assert mock_client.call_args[1]["protocol"] == protocol
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
("mqtt_config_entry_data", "connect_args"),
|
||||||
|
[
|
||||||
|
(
|
||||||
|
{
|
||||||
|
mqtt.CONF_BROKER: "mock-broker",
|
||||||
|
CONF_PROTOCOL: "3.1",
|
||||||
|
},
|
||||||
|
call(host="mock-broker", port=1883, keepalive=60, clean_start=3),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
{
|
||||||
|
mqtt.CONF_BROKER: "mock-broker",
|
||||||
|
CONF_PROTOCOL: "3.1.1",
|
||||||
|
},
|
||||||
|
call(host="mock-broker", port=1883, keepalive=60, clean_start=3),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
{
|
||||||
|
mqtt.CONF_BROKER: "mock-broker",
|
||||||
|
CONF_PROTOCOL: "5",
|
||||||
|
},
|
||||||
|
call(host="mock-broker", port=1883, keepalive=60, clean_start=True),
|
||||||
|
),
|
||||||
|
],
|
||||||
|
ids=["v3.1", "v3.1.1", "v5"],
|
||||||
|
)
|
||||||
|
async def test_setup_mqtt_client_clean_start(
|
||||||
|
hass: HomeAssistant,
|
||||||
|
mqtt_mock_entry: MqttMockHAClientGenerator,
|
||||||
|
mqtt_client_mock: MqttMockPahoClient,
|
||||||
|
connect_args: tuple[Any],
|
||||||
|
) -> None:
|
||||||
|
"""Test MQTT client protocol connects with `clean_start` set correctly."""
|
||||||
|
await mqtt_mock_entry()
|
||||||
|
|
||||||
|
# check if clean_start was set correctly
|
||||||
|
assert len(mqtt_client_mock.connect.mock_calls) == 1
|
||||||
|
assert mqtt_client_mock.connect.mock_calls[0] == connect_args
|
||||||
|
|
||||||
|
|
||||||
@patch("homeassistant.components.mqtt.client.TIMEOUT_ACK", 0.2)
|
@patch("homeassistant.components.mqtt.client.TIMEOUT_ACK", 0.2)
|
||||||
async def test_handle_mqtt_timeout_on_callback(
|
async def test_handle_mqtt_timeout_on_callback(
|
||||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_debouncer: asyncio.Event
|
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_debouncer: asyncio.Event
|
||||||
|
Loading…
x
Reference in New Issue
Block a user