mirror of
https://github.com/home-assistant/core.git
synced 2025-07-18 18:57:06 +00:00
Fix creating multiple ElkM1 systems with TLS 1.2 (#81627)
fixes https://github.com/home-assistant/core/issues/81516
This commit is contained in:
parent
3184c8a526
commit
2bea77549d
@ -7,11 +7,11 @@ import logging
|
|||||||
import re
|
import re
|
||||||
from types import MappingProxyType
|
from types import MappingProxyType
|
||||||
from typing import Any, cast
|
from typing import Any, cast
|
||||||
from urllib.parse import urlparse
|
|
||||||
|
|
||||||
import async_timeout
|
import async_timeout
|
||||||
from elkm1_lib.elements import Element
|
from elkm1_lib.elements import Element
|
||||||
from elkm1_lib.elk import Elk
|
from elkm1_lib.elk import Elk
|
||||||
|
from elkm1_lib.util import parse_url
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
|
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
|
||||||
@ -96,6 +96,11 @@ SET_TIME_SERVICE_SCHEMA = vol.Schema(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def hostname_from_url(url: str) -> str:
|
||||||
|
"""Return the hostname from a url."""
|
||||||
|
return parse_url(url)[1]
|
||||||
|
|
||||||
|
|
||||||
def _host_validator(config: dict[str, str]) -> dict[str, str]:
|
def _host_validator(config: dict[str, str]) -> dict[str, str]:
|
||||||
"""Validate that a host is properly configured."""
|
"""Validate that a host is properly configured."""
|
||||||
if config[CONF_HOST].startswith("elks://"):
|
if config[CONF_HOST].startswith("elks://"):
|
||||||
@ -231,7 +236,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||||||
"""Set up Elk-M1 Control from a config entry."""
|
"""Set up Elk-M1 Control from a config entry."""
|
||||||
conf: MappingProxyType[str, Any] = entry.data
|
conf: MappingProxyType[str, Any] = entry.data
|
||||||
|
|
||||||
host = urlparse(entry.data[CONF_HOST]).hostname
|
host = hostname_from_url(entry.data[CONF_HOST])
|
||||||
|
|
||||||
_LOGGER.debug("Setting up elkm1 %s", conf["host"])
|
_LOGGER.debug("Setting up elkm1 %s", conf["host"])
|
||||||
|
|
||||||
|
@ -4,7 +4,6 @@ from __future__ import annotations
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from urllib.parse import urlparse
|
|
||||||
|
|
||||||
from elkm1_lib.discovery import ElkSystem
|
from elkm1_lib.discovery import ElkSystem
|
||||||
from elkm1_lib.elk import Elk
|
from elkm1_lib.elk import Elk
|
||||||
@ -26,7 +25,7 @@ from homeassistant.helpers.typing import DiscoveryInfoType
|
|||||||
from homeassistant.util import slugify
|
from homeassistant.util import slugify
|
||||||
from homeassistant.util.network import is_ip_address
|
from homeassistant.util.network import is_ip_address
|
||||||
|
|
||||||
from . import async_wait_for_elk_to_sync
|
from . import async_wait_for_elk_to_sync, hostname_from_url
|
||||||
from .const import CONF_AUTO_CONFIGURE, DISCOVER_SCAN_TIMEOUT, DOMAIN, LOGIN_TIMEOUT
|
from .const import CONF_AUTO_CONFIGURE, DISCOVER_SCAN_TIMEOUT, DOMAIN, LOGIN_TIMEOUT
|
||||||
from .discovery import (
|
from .discovery import (
|
||||||
_short_mac,
|
_short_mac,
|
||||||
@ -170,7 +169,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
|||||||
for entry in self._async_current_entries(include_ignore=False):
|
for entry in self._async_current_entries(include_ignore=False):
|
||||||
if (
|
if (
|
||||||
entry.unique_id == mac
|
entry.unique_id == mac
|
||||||
or urlparse(entry.data[CONF_HOST]).hostname == host
|
or hostname_from_url(entry.data[CONF_HOST]) == host
|
||||||
):
|
):
|
||||||
if async_update_entry_from_discovery(self.hass, entry, device):
|
if async_update_entry_from_discovery(self.hass, entry, device):
|
||||||
self.hass.async_create_task(
|
self.hass.async_create_task(
|
||||||
@ -214,7 +213,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
|||||||
|
|
||||||
current_unique_ids = self._async_current_ids()
|
current_unique_ids = self._async_current_ids()
|
||||||
current_hosts = {
|
current_hosts = {
|
||||||
urlparse(entry.data[CONF_HOST]).hostname
|
hostname_from_url(entry.data[CONF_HOST])
|
||||||
for entry in self._async_current_entries(include_ignore=False)
|
for entry in self._async_current_entries(include_ignore=False)
|
||||||
}
|
}
|
||||||
discovered_devices = await async_discover_devices(
|
discovered_devices = await async_discover_devices(
|
||||||
@ -344,7 +343,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
|||||||
if self._url_already_configured(url):
|
if self._url_already_configured(url):
|
||||||
return self.async_abort(reason="address_already_configured")
|
return self.async_abort(reason="address_already_configured")
|
||||||
|
|
||||||
host = urlparse(url).hostname
|
host = hostname_from_url(url)
|
||||||
_LOGGER.debug(
|
_LOGGER.debug(
|
||||||
"Importing is trying to fill unique id from discovery for %s", host
|
"Importing is trying to fill unique id from discovery for %s", host
|
||||||
)
|
)
|
||||||
@ -367,10 +366,10 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
|||||||
def _url_already_configured(self, url: str) -> bool:
|
def _url_already_configured(self, url: str) -> bool:
|
||||||
"""See if we already have a elkm1 matching user input configured."""
|
"""See if we already have a elkm1 matching user input configured."""
|
||||||
existing_hosts = {
|
existing_hosts = {
|
||||||
urlparse(entry.data[CONF_HOST]).hostname
|
hostname_from_url(entry.data[CONF_HOST])
|
||||||
for entry in self._async_current_entries()
|
for entry in self._async_current_entries()
|
||||||
}
|
}
|
||||||
return urlparse(url).hostname in existing_hosts
|
return hostname_from_url(url) in existing_hosts
|
||||||
|
|
||||||
|
|
||||||
class InvalidAuth(exceptions.HomeAssistantError):
|
class InvalidAuth(exceptions.HomeAssistantError):
|
||||||
|
@ -1454,3 +1454,149 @@ async def test_multiple_instances_with_discovery(hass):
|
|||||||
"password": "",
|
"password": "",
|
||||||
}
|
}
|
||||||
assert len(mock_setup_entry.mock_calls) == 1
|
assert len(mock_setup_entry.mock_calls) == 1
|
||||||
|
|
||||||
|
|
||||||
|
async def test_multiple_instances_with_tls_v12(hass):
|
||||||
|
"""Test we can setup a secure elk with tls v1_2."""
|
||||||
|
|
||||||
|
elk_discovery_1 = ElkSystem("aa:bb:cc:dd:ee:ff", "127.0.0.1", 2601)
|
||||||
|
elk_discovery_2 = ElkSystem("aa:bb:cc:dd:ee:fe", "127.0.0.2", 2601)
|
||||||
|
|
||||||
|
with _patch_discovery(device=elk_discovery_1):
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||||
|
)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
assert result["type"] == "form"
|
||||||
|
assert not result["errors"]
|
||||||
|
assert result["step_id"] == "user"
|
||||||
|
|
||||||
|
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||||
|
|
||||||
|
with _patch_elk(elk=mocked_elk):
|
||||||
|
result2 = await hass.config_entries.flow.async_configure(
|
||||||
|
result["flow_id"],
|
||||||
|
{"device": elk_discovery_1.mac_address},
|
||||||
|
)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
assert result2["type"] == "form"
|
||||||
|
assert not result["errors"]
|
||||||
|
assert result2["step_id"] == "discovered_connection"
|
||||||
|
with _patch_discovery(device=elk_discovery_1), _patch_elk(elk=mocked_elk), patch(
|
||||||
|
"homeassistant.components.elkm1.async_setup", return_value=True
|
||||||
|
) as mock_setup, patch(
|
||||||
|
"homeassistant.components.elkm1.async_setup_entry",
|
||||||
|
return_value=True,
|
||||||
|
) as mock_setup_entry:
|
||||||
|
result3 = await hass.config_entries.flow.async_configure(
|
||||||
|
result2["flow_id"],
|
||||||
|
{
|
||||||
|
"protocol": "TLS 1.2",
|
||||||
|
"username": "test-username",
|
||||||
|
"password": "test-password",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
assert result3["type"] == "create_entry"
|
||||||
|
assert result3["title"] == "ElkM1 ddeeff"
|
||||||
|
assert result3["data"] == {
|
||||||
|
"auto_configure": True,
|
||||||
|
"host": "elksv1_2://127.0.0.1",
|
||||||
|
"password": "test-password",
|
||||||
|
"prefix": "",
|
||||||
|
"username": "test-username",
|
||||||
|
}
|
||||||
|
assert len(mock_setup.mock_calls) == 1
|
||||||
|
assert len(mock_setup_entry.mock_calls) == 1
|
||||||
|
|
||||||
|
# Now try to add another instance with the different discovery info
|
||||||
|
with _patch_discovery(device=elk_discovery_2):
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||||
|
)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
assert result["type"] == "form"
|
||||||
|
assert not result["errors"]
|
||||||
|
assert result["step_id"] == "user"
|
||||||
|
|
||||||
|
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||||
|
|
||||||
|
with _patch_elk(elk=mocked_elk):
|
||||||
|
result2 = await hass.config_entries.flow.async_configure(
|
||||||
|
result["flow_id"],
|
||||||
|
{"device": elk_discovery_2.mac_address},
|
||||||
|
)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
with _patch_discovery(device=elk_discovery_2), _patch_elk(elk=mocked_elk), patch(
|
||||||
|
"homeassistant.components.elkm1.async_setup_entry",
|
||||||
|
return_value=True,
|
||||||
|
) as mock_setup_entry:
|
||||||
|
result3 = await hass.config_entries.flow.async_configure(
|
||||||
|
result2["flow_id"],
|
||||||
|
{
|
||||||
|
"protocol": "TLS 1.2",
|
||||||
|
"username": "test-username",
|
||||||
|
"password": "test-password",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
assert result3["type"] == "create_entry"
|
||||||
|
assert result3["title"] == "ElkM1 ddeefe"
|
||||||
|
assert result3["data"] == {
|
||||||
|
"auto_configure": True,
|
||||||
|
"host": "elksv1_2://127.0.0.2",
|
||||||
|
"password": "test-password",
|
||||||
|
"prefix": "ddeefe",
|
||||||
|
"username": "test-username",
|
||||||
|
}
|
||||||
|
assert len(mock_setup_entry.mock_calls) == 1
|
||||||
|
|
||||||
|
# Finally, try to add another instance manually with no discovery info
|
||||||
|
|
||||||
|
with _patch_discovery(no_device=True):
|
||||||
|
result = await hass.config_entries.flow.async_init(
|
||||||
|
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||||
|
)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
assert result["type"] == "form"
|
||||||
|
assert result["errors"] == {}
|
||||||
|
assert result["step_id"] == "manual_connection"
|
||||||
|
|
||||||
|
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||||
|
|
||||||
|
with _patch_discovery(no_device=True), _patch_elk(elk=mocked_elk), patch(
|
||||||
|
"homeassistant.components.elkm1.async_setup_entry",
|
||||||
|
return_value=True,
|
||||||
|
) as mock_setup_entry:
|
||||||
|
result2 = await hass.config_entries.flow.async_configure(
|
||||||
|
result["flow_id"],
|
||||||
|
{
|
||||||
|
"protocol": "TLS 1.2",
|
||||||
|
"address": "1.2.3.4",
|
||||||
|
"prefix": "guest_house",
|
||||||
|
"password": "test-password",
|
||||||
|
"username": "test-username",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
import pprint
|
||||||
|
|
||||||
|
pprint.pprint(result2)
|
||||||
|
assert result2["type"] == "create_entry"
|
||||||
|
assert result2["title"] == "guest_house"
|
||||||
|
assert result2["data"] == {
|
||||||
|
"auto_configure": True,
|
||||||
|
"host": "elksv1_2://1.2.3.4",
|
||||||
|
"prefix": "guest_house",
|
||||||
|
"password": "test-password",
|
||||||
|
"username": "test-username",
|
||||||
|
}
|
||||||
|
assert len(mock_setup_entry.mock_calls) == 1
|
||||||
|
Loading…
x
Reference in New Issue
Block a user