Use dataclass for ZeroconfServiceInfo (#60206)

Co-authored-by: epenet <epenet@users.noreply.github.com>
This commit is contained in:
epenet 2021-11-23 22:59:36 +01:00 committed by GitHub
parent 2de0a14db0
commit 44611d7e26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 72 additions and 54 deletions

View File

@ -186,7 +186,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
self, discovery_info: zeroconf.ZeroconfServiceInfo self, discovery_info: zeroconf.ZeroconfServiceInfo
) -> FlowResult: ) -> FlowResult:
"""Handle zeroconf discovery.""" """Handle zeroconf discovery."""
if not discovery_info.get(zeroconf.ATTR_NAME, "").startswith("Bosch SHC"): if not discovery_info.name.startswith("Bosch SHC"):
return self.async_abort(reason="not_bosch_shc") return self.async_abort(reason="not_bosch_shc")
try: try:

View File

@ -1,7 +1,7 @@
"""Config flow for Modern Forms.""" """Config flow for Modern Forms."""
from __future__ import annotations from __future__ import annotations
from typing import Any, cast from typing import Any
from aiomodernforms import ModernFormsConnectionError, ModernFormsDevice from aiomodernforms import ModernFormsConnectionError, ModernFormsDevice
import voluptuous as vol import voluptuous as vol
@ -43,7 +43,7 @@ class ModernFormsFlowHandler(ConfigFlow, domain=DOMAIN):
) )
# Prepare configuration flow # Prepare configuration flow
return await self._handle_config_flow(cast(dict, discovery_info), True) return await self._handle_config_flow({}, True)
async def async_step_zeroconf_confirm( async def async_step_zeroconf_confirm(
self, user_input: dict[str, Any] | None = None self, user_input: dict[str, Any] | None = None

View File

@ -107,7 +107,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
user_input.update( user_input.update(
{ {
CONF_HOST: self.discovery_info[CONF_HOST], CONF_HOST: self.discovery_info[CONF_HOST],
CONF_PORT: self.discovery_info.get(CONF_PORT, DEFAULT_PORT), CONF_PORT: self.discovery_info.port or DEFAULT_PORT,
} }
) )
info, errors = await self._async_validate_or_error(user_input) info, errors = await self._async_validate_or_error(user_input)

View File

@ -201,9 +201,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
self._abort_if_unique_id_configured({CONF_HOST: host}) self._abort_if_unique_id_configured({CONF_HOST: host})
self.host = host self.host = host
self.context["title_placeholders"] = { self.context["title_placeholders"] = {"name": discovery_info.name.split(".")[0]}
"name": discovery_info.get("name", "").split(".")[0]
}
if get_info_auth(self.info): if get_info_auth(self.info):
return await self.async_step_credentials() return await self.async_step_credentials()

View File

@ -1,7 +1,7 @@
"""Config flow to configure the WLED integration.""" """Config flow to configure the WLED integration."""
from __future__ import annotations from __future__ import annotations
from typing import Any, cast from typing import Any
import voluptuous as vol import voluptuous as vol
from wled import WLED, WLEDConnectionError from wled import WLED, WLEDConnectionError
@ -57,7 +57,7 @@ class WLEDFlowHandler(ConfigFlow, domain=DOMAIN):
) )
# Prepare configuration flow # Prepare configuration flow
return await self._handle_config_flow(cast(dict, discovery_info), True) return await self._handle_config_flow({}, True)
async def async_step_zeroconf_confirm( async def async_step_zeroconf_confirm(
self, user_input: dict[str, Any] | None = None self, user_input: dict[str, Any] | None = None

View File

@ -3,12 +3,13 @@ from __future__ import annotations
import asyncio import asyncio
from contextlib import suppress from contextlib import suppress
from dataclasses import dataclass
import fnmatch import fnmatch
from ipaddress import IPv4Address, IPv6Address, ip_address from ipaddress import IPv4Address, IPv6Address, ip_address
import logging import logging
import socket import socket
import sys import sys
from typing import Any, Final, TypedDict, cast from typing import Any, Final, cast
import voluptuous as vol import voluptuous as vol
from zeroconf import InterfaceChoice, IPVersion, ServiceStateChange from zeroconf import InterfaceChoice, IPVersion, ServiceStateChange
@ -25,8 +26,10 @@ from homeassistant.const import (
__version__, __version__,
) )
from homeassistant.core import Event, HomeAssistant, callback from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.data_entry_flow import BaseServiceInfo
from homeassistant.helpers import discovery_flow from homeassistant.helpers import discovery_flow
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.frame import report
from homeassistant.helpers.network import NoURLAvailableError, get_url from homeassistant.helpers.network import NoURLAvailableError, get_url
from homeassistant.helpers.typing import ConfigType from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import async_get_homekit, async_get_zeroconf, bind_hass from homeassistant.loader import async_get_homekit, async_get_zeroconf, bind_hass
@ -89,7 +92,8 @@ CONFIG_SCHEMA = vol.Schema(
) )
class ZeroconfServiceInfo(TypedDict): @dataclass
class ZeroconfServiceInfo(BaseServiceInfo):
"""Prepared info from mDNS entries.""" """Prepared info from mDNS entries."""
host: str host: str
@ -99,6 +103,25 @@ class ZeroconfServiceInfo(TypedDict):
name: str name: str
properties: dict[str, Any] properties: dict[str, Any]
# Used to prevent log flooding. To be removed in 2022.6
_warning_logged: bool = False
def __getitem__(self, name: str) -> Any:
"""
Allow property access by name for compatibility reason.
Deprecated, and will be removed in version 2022.6.
"""
if not self._warning_logged:
report(
f"accessed discovery_info['{name}'] instead of discovery_info.{name}; this will fail in version 2022.6",
exclude_integrations={"zeroconf"},
error_if_core=False,
level=logging.DEBUG,
)
self._warning_logged = True
return getattr(self, name)
@bind_hass @bind_hass
async def async_get_instance(hass: HomeAssistant) -> HaZeroconf: async def async_get_instance(hass: HomeAssistant) -> HaZeroconf:
@ -360,7 +383,7 @@ class ZeroconfDiscovery:
# If we can handle it as a HomeKit discovery, we do that here. # If we can handle it as a HomeKit discovery, we do that here.
if service_type in HOMEKIT_TYPES: if service_type in HOMEKIT_TYPES:
props = info[ATTR_PROPERTIES] props = info.properties
if domain := async_get_homekit_discovery_domain(self.homekit_models, props): if domain := async_get_homekit_discovery_domain(self.homekit_models, props):
discovery_flow.async_create_flow( discovery_flow.async_create_flow(
self.hass, domain, {"source": config_entries.SOURCE_HOMEKIT}, info self.hass, domain, {"source": config_entries.SOURCE_HOMEKIT}, info
@ -382,25 +405,23 @@ class ZeroconfDiscovery:
# likely bad homekit data # likely bad homekit data
return return
if ATTR_NAME in info: if info.name:
lowercase_name: str | None = info[ATTR_NAME].lower() lowercase_name: str | None = info.name.lower()
else: else:
lowercase_name = None lowercase_name = None
if "macaddress" in info[ATTR_PROPERTIES]: if "macaddress" in info.properties:
uppercase_mac: str | None = info[ATTR_PROPERTIES]["macaddress"].upper() uppercase_mac: str | None = info.properties["macaddress"].upper()
else: else:
uppercase_mac = None uppercase_mac = None
if "manufacturer" in info[ATTR_PROPERTIES]: if "manufacturer" in info.properties:
lowercase_manufacturer: str | None = info[ATTR_PROPERTIES][ lowercase_manufacturer: str | None = info.properties["manufacturer"].lower()
"manufacturer"
].lower()
else: else:
lowercase_manufacturer = None lowercase_manufacturer = None
if "model" in info[ATTR_PROPERTIES]: if "model" in info.properties:
lowercase_model: str | None = info[ATTR_PROPERTIES]["model"].lower() lowercase_model: str | None = info.properties["model"].lower()
else: else:
lowercase_model = None lowercase_model = None

View File

@ -162,8 +162,8 @@ def get_device_discovery_info(
del result["properties"]["c#"] del result["properties"]["c#"]
if upper_case_props: if upper_case_props:
result["properties"] = { result.properties = {
key.upper(): val for (key, val) in result["properties"].items() key.upper(): val for (key, val) in result.properties.items()
} }
return result return result

View File

@ -1,4 +1,5 @@
"""Tests for the IPP config flow.""" """Tests for the IPP config flow."""
import dataclasses
from unittest.mock import patch from unittest.mock import patch
from homeassistant.components import zeroconf from homeassistant.components import zeroconf
@ -40,7 +41,7 @@ async def test_show_zeroconf_form(
"""Test that the zeroconf confirmation form is served.""" """Test that the zeroconf confirmation form is served."""
mock_connection(aioclient_mock) mock_connection(aioclient_mock)
discovery_info = MOCK_ZEROCONF_IPP_SERVICE_INFO.copy() discovery_info = dataclasses.replace(MOCK_ZEROCONF_IPP_SERVICE_INFO)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_ZEROCONF}, context={"source": SOURCE_ZEROCONF},
@ -76,7 +77,7 @@ async def test_zeroconf_connection_error(
"""Test we abort zeroconf flow on IPP connection error.""" """Test we abort zeroconf flow on IPP connection error."""
mock_connection(aioclient_mock, conn_error=True) mock_connection(aioclient_mock, conn_error=True)
discovery_info = MOCK_ZEROCONF_IPP_SERVICE_INFO.copy() discovery_info = dataclasses.replace(MOCK_ZEROCONF_IPP_SERVICE_INFO)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_ZEROCONF}, context={"source": SOURCE_ZEROCONF},
@ -93,7 +94,7 @@ async def test_zeroconf_confirm_connection_error(
"""Test we abort zeroconf flow on IPP connection error.""" """Test we abort zeroconf flow on IPP connection error."""
mock_connection(aioclient_mock, conn_error=True) mock_connection(aioclient_mock, conn_error=True)
discovery_info = MOCK_ZEROCONF_IPP_SERVICE_INFO.copy() discovery_info = dataclasses.replace(MOCK_ZEROCONF_IPP_SERVICE_INFO)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_ZEROCONF}, data=discovery_info DOMAIN, context={"source": SOURCE_ZEROCONF}, data=discovery_info
) )
@ -126,7 +127,7 @@ async def test_zeroconf_connection_upgrade_required(
"""Test we abort zeroconf flow on IPP connection error.""" """Test we abort zeroconf flow on IPP connection error."""
mock_connection(aioclient_mock, conn_upgrade_error=True) mock_connection(aioclient_mock, conn_upgrade_error=True)
discovery_info = MOCK_ZEROCONF_IPP_SERVICE_INFO.copy() discovery_info = dataclasses.replace(MOCK_ZEROCONF_IPP_SERVICE_INFO)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_ZEROCONF}, context={"source": SOURCE_ZEROCONF},
@ -160,7 +161,7 @@ async def test_zeroconf_parse_error(
"""Test we abort zeroconf flow on IPP parse error.""" """Test we abort zeroconf flow on IPP parse error."""
mock_connection(aioclient_mock, parse_error=True) mock_connection(aioclient_mock, parse_error=True)
discovery_info = MOCK_ZEROCONF_IPP_SERVICE_INFO.copy() discovery_info = dataclasses.replace(MOCK_ZEROCONF_IPP_SERVICE_INFO)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_ZEROCONF}, context={"source": SOURCE_ZEROCONF},
@ -194,7 +195,7 @@ async def test_zeroconf_ipp_error(
"""Test we abort zeroconf flow on IPP error.""" """Test we abort zeroconf flow on IPP error."""
mock_connection(aioclient_mock, ipp_error=True) mock_connection(aioclient_mock, ipp_error=True)
discovery_info = MOCK_ZEROCONF_IPP_SERVICE_INFO.copy() discovery_info = dataclasses.replace(MOCK_ZEROCONF_IPP_SERVICE_INFO)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_ZEROCONF}, context={"source": SOURCE_ZEROCONF},
@ -228,7 +229,7 @@ async def test_zeroconf_ipp_version_error(
"""Test we abort zeroconf flow on IPP version not supported error.""" """Test we abort zeroconf flow on IPP version not supported error."""
mock_connection(aioclient_mock, version_not_supported=True) mock_connection(aioclient_mock, version_not_supported=True)
discovery_info = {**MOCK_ZEROCONF_IPP_SERVICE_INFO} discovery_info = dataclasses.replace(MOCK_ZEROCONF_IPP_SERVICE_INFO)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_ZEROCONF}, context={"source": SOURCE_ZEROCONF},
@ -262,7 +263,7 @@ async def test_zeroconf_device_exists_abort(
"""Test we abort zeroconf flow if printer already configured.""" """Test we abort zeroconf flow if printer already configured."""
await init_integration(hass, aioclient_mock, skip_setup=True) await init_integration(hass, aioclient_mock, skip_setup=True)
discovery_info = MOCK_ZEROCONF_IPP_SERVICE_INFO.copy() discovery_info = dataclasses.replace(MOCK_ZEROCONF_IPP_SERVICE_INFO)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_ZEROCONF}, context={"source": SOURCE_ZEROCONF},
@ -279,13 +280,12 @@ async def test_zeroconf_with_uuid_device_exists_abort(
"""Test we abort zeroconf flow if printer already configured.""" """Test we abort zeroconf flow if printer already configured."""
await init_integration(hass, aioclient_mock, skip_setup=True) await init_integration(hass, aioclient_mock, skip_setup=True)
discovery_info = { discovery_info = dataclasses.replace(MOCK_ZEROCONF_IPP_SERVICE_INFO)
**MOCK_ZEROCONF_IPP_SERVICE_INFO, discovery_info.properties = {
"properties": { **MOCK_ZEROCONF_IPP_SERVICE_INFO[zeroconf.ATTR_PROPERTIES],
**MOCK_ZEROCONF_IPP_SERVICE_INFO[zeroconf.ATTR_PROPERTIES], "UUID": "cfe92100-67c4-11d4-a45f-f8d027761251",
"UUID": "cfe92100-67c4-11d4-a45f-f8d027761251",
},
} }
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_ZEROCONF}, context={"source": SOURCE_ZEROCONF},
@ -302,12 +302,10 @@ async def test_zeroconf_empty_unique_id(
"""Test zeroconf flow if printer lacks (empty) unique identification.""" """Test zeroconf flow if printer lacks (empty) unique identification."""
mock_connection(aioclient_mock, no_unique_id=True) mock_connection(aioclient_mock, no_unique_id=True)
discovery_info = { discovery_info = dataclasses.replace(MOCK_ZEROCONF_IPP_SERVICE_INFO)
**MOCK_ZEROCONF_IPP_SERVICE_INFO, discovery_info.properties = {
"properties": { **MOCK_ZEROCONF_IPP_SERVICE_INFO[zeroconf.ATTR_PROPERTIES],
**MOCK_ZEROCONF_IPP_SERVICE_INFO[zeroconf.ATTR_PROPERTIES], "UUID": "",
"UUID": "",
},
} }
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
@ -324,7 +322,7 @@ async def test_zeroconf_no_unique_id(
"""Test zeroconf flow if printer lacks unique identification.""" """Test zeroconf flow if printer lacks unique identification."""
mock_connection(aioclient_mock, no_unique_id=True) mock_connection(aioclient_mock, no_unique_id=True)
discovery_info = MOCK_ZEROCONF_IPP_SERVICE_INFO.copy() discovery_info = dataclasses.replace(MOCK_ZEROCONF_IPP_SERVICE_INFO)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_ZEROCONF}, context={"source": SOURCE_ZEROCONF},
@ -371,7 +369,7 @@ async def test_full_zeroconf_flow_implementation(
"""Test the full manual user flow from start to finish.""" """Test the full manual user flow from start to finish."""
mock_connection(aioclient_mock) mock_connection(aioclient_mock)
discovery_info = MOCK_ZEROCONF_IPP_SERVICE_INFO.copy() discovery_info = dataclasses.replace(MOCK_ZEROCONF_IPP_SERVICE_INFO)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_ZEROCONF}, context={"source": SOURCE_ZEROCONF},
@ -405,7 +403,7 @@ async def test_full_zeroconf_tls_flow_implementation(
"""Test the full manual user flow from start to finish.""" """Test the full manual user flow from start to finish."""
mock_connection(aioclient_mock, ssl=True) mock_connection(aioclient_mock, ssl=True)
discovery_info = MOCK_ZEROCONF_IPPS_SERVICE_INFO.copy() discovery_info = dataclasses.replace(MOCK_ZEROCONF_IPPS_SERVICE_INFO)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_ZEROCONF}, context={"source": SOURCE_ZEROCONF},

View File

@ -1,12 +1,12 @@
"""Define tests for the lookin config flow.""" """Define tests for the lookin config flow."""
from __future__ import annotations from __future__ import annotations
import dataclasses
from unittest.mock import patch from unittest.mock import patch
from aiolookin import NoUsableService from aiolookin import NoUsableService
from homeassistant import config_entries from homeassistant import config_entries
from homeassistant.components import zeroconf
from homeassistant.components.lookin.const import DOMAIN from homeassistant.components.lookin.const import DOMAIN
from homeassistant.const import CONF_HOST from homeassistant.const import CONF_HOST
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
@ -138,8 +138,8 @@ async def test_discovered_zeroconf(hass):
assert mock_async_setup_entry.called assert mock_async_setup_entry.called
entry = hass.config_entries.async_entries(DOMAIN)[0] entry = hass.config_entries.async_entries(DOMAIN)[0]
zc_data_new_ip = ZEROCONF_DATA.copy() zc_data_new_ip = dataclasses.replace(ZEROCONF_DATA)
zc_data_new_ip[zeroconf.ATTR_HOST] = "127.0.0.2" zc_data_new_ip.host = "127.0.0.2"
with _patch_get_info(), patch( with _patch_get_info(), patch(
f"{MODULE}.async_setup_entry", return_value=True f"{MODULE}.async_setup_entry", return_value=True

View File

@ -1,4 +1,5 @@
"""Test the Roku config flow.""" """Test the Roku config flow."""
import dataclasses
from unittest.mock import patch from unittest.mock import patch
from homeassistant.components.roku.const import DOMAIN from homeassistant.components.roku.const import DOMAIN
@ -136,7 +137,7 @@ async def test_homekit_cannot_connect(
error=True, error=True,
) )
discovery_info = MOCK_HOMEKIT_DISCOVERY_INFO.copy() discovery_info = dataclasses.replace(MOCK_HOMEKIT_DISCOVERY_INFO)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={CONF_SOURCE: SOURCE_HOMEKIT}, context={CONF_SOURCE: SOURCE_HOMEKIT},
@ -151,7 +152,7 @@ async def test_homekit_unknown_error(
hass: HomeAssistant, aioclient_mock: AiohttpClientMocker hass: HomeAssistant, aioclient_mock: AiohttpClientMocker
) -> None: ) -> None:
"""Test we abort homekit flow on unknown error.""" """Test we abort homekit flow on unknown error."""
discovery_info = MOCK_HOMEKIT_DISCOVERY_INFO.copy() discovery_info = dataclasses.replace(MOCK_HOMEKIT_DISCOVERY_INFO)
with patch( with patch(
"homeassistant.components.roku.config_flow.Roku.update", "homeassistant.components.roku.config_flow.Roku.update",
side_effect=Exception, side_effect=Exception,
@ -172,7 +173,7 @@ async def test_homekit_discovery(
"""Test the homekit discovery flow.""" """Test the homekit discovery flow."""
mock_connection(aioclient_mock, device="rokutv", host=HOMEKIT_HOST) mock_connection(aioclient_mock, device="rokutv", host=HOMEKIT_HOST)
discovery_info = MOCK_HOMEKIT_DISCOVERY_INFO.copy() discovery_info = dataclasses.replace(MOCK_HOMEKIT_DISCOVERY_INFO)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, context={CONF_SOURCE: SOURCE_HOMEKIT}, data=discovery_info DOMAIN, context={CONF_SOURCE: SOURCE_HOMEKIT}, data=discovery_info
) )
@ -200,7 +201,7 @@ async def test_homekit_discovery(
assert len(mock_setup_entry.mock_calls) == 1 assert len(mock_setup_entry.mock_calls) == 1
# test abort on existing host # test abort on existing host
discovery_info = MOCK_HOMEKIT_DISCOVERY_INFO.copy() discovery_info = dataclasses.replace(MOCK_HOMEKIT_DISCOVERY_INFO)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, context={CONF_SOURCE: SOURCE_HOMEKIT}, data=discovery_info DOMAIN, context={CONF_SOURCE: SOURCE_HOMEKIT}, data=discovery_info
) )