Move KNX keyring validation and storage to helper module (#97931)

* Move KNX keyfile validation and store to helper module

* Rename module and fix tests
This commit is contained in:
Matthias Alphart 2023-08-07 23:30:14 +02:00 committed by GitHub
parent 987897b0fa
commit 0f5d423d1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 60 additions and 39 deletions

View File

@ -3,8 +3,6 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator
from pathlib import Path
import shutil
from typing import Any, Final
import voluptuous as vol
@ -18,15 +16,13 @@ from xknx.io import DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT
from xknx.io.gateway_scanner import GatewayDescriptor, GatewayScanner
from xknx.io.self_description import request_description
from xknx.io.util import validate_ip as xknx_validate_ip
from xknx.secure.keyring import Keyring, XMLInterface, sync_load_keyring
from xknx.secure.keyring import Keyring, XMLInterface
from homeassistant.components.file_upload import process_uploaded_file
from homeassistant.config_entries import ConfigEntry, ConfigFlow, OptionsFlow
from homeassistant.const import CONF_HOST, CONF_PORT
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowHandler, FlowResult
from homeassistant.helpers import selector
from homeassistant.helpers.storage import STORAGE_DIR
from homeassistant.helpers.typing import UNDEFINED
from .const import (
@ -60,6 +56,7 @@ from .const import (
TELEGRAM_LOG_MAX,
KNXConfigEntryData,
)
from .helpers.keyring import DEFAULT_KNX_KEYRING_FILENAME, save_uploaded_knxkeys_file
from .schema import ia_validator, ip_v4_validator
CONF_KNX_GATEWAY: Final = "gateway"
@ -77,7 +74,6 @@ DEFAULT_ENTRY_DATA = KNXConfigEntryData(
)
CONF_KEYRING_FILE: Final = "knxkeys_file"
DEFAULT_KNX_KEYRING_FILENAME: Final = "keyring.knxkeys"
CONF_KNX_TUNNELING_TYPE: Final = "tunneling_type"
CONF_KNX_TUNNELING_TYPE_LABELS: Final = {
@ -499,10 +495,15 @@ class KNXCommonFlow(ABC, FlowHandler):
if user_input is not None:
password = user_input[CONF_KNX_KNXKEY_PASSWORD]
errors = await self._save_uploaded_knxkeys_file(
uploaded_file_id=user_input[CONF_KEYRING_FILE],
password=password,
)
try:
self._keyring = await save_uploaded_knxkeys_file(
self.hass,
uploaded_file_id=user_input[CONF_KEYRING_FILE],
password=password,
)
except InvalidSecureConfiguration:
errors[CONF_KNX_KNXKEY_PASSWORD] = "keyfile_invalid_signature"
if not errors and self._keyring:
self.new_entry_data |= KNXConfigEntryData(
knxkeys_filename=f"{DOMAIN}/{DEFAULT_KNX_KEYRING_FILENAME}",
@ -711,33 +712,6 @@ class KNXCommonFlow(ABC, FlowHandler):
step_id="routing", data_schema=vol.Schema(fields), errors=errors
)
async def _save_uploaded_knxkeys_file(
self, uploaded_file_id: str, password: str
) -> dict[str, str]:
"""Validate the uploaded file and move it to the storage directory. Return errors."""
def _process_upload() -> tuple[Keyring | None, dict[str, str]]:
keyring: Keyring | None = None
errors = {}
with process_uploaded_file(self.hass, uploaded_file_id) as file_path:
try:
keyring = sync_load_keyring(
path=file_path,
password=password,
)
except InvalidSecureConfiguration:
errors[CONF_KNX_KNXKEY_PASSWORD] = "keyfile_invalid_signature"
else:
dest_path = Path(self.hass.config.path(STORAGE_DIR, DOMAIN))
dest_path.mkdir(exist_ok=True)
dest_file = dest_path / DEFAULT_KNX_KEYRING_FILENAME
shutil.move(file_path, dest_file)
return keyring, errors
keyring, errors = await self.hass.async_add_executor_job(_process_upload)
self._keyring = keyring
return errors
class KNXConfigFlow(KNXCommonFlow, ConfigFlow, domain=DOMAIN):
"""Handle a KNX config flow."""

View File

@ -0,0 +1,47 @@
"""KNX Keyring handler."""
import logging
from pathlib import Path
import shutil
from typing import Final
from xknx.exceptions.exception import InvalidSecureConfiguration
from xknx.secure.keyring import Keyring, sync_load_keyring
from homeassistant.components.file_upload import process_uploaded_file
from homeassistant.core import HomeAssistant
from homeassistant.helpers.storage import STORAGE_DIR
from ..const import DOMAIN
_LOGGER = logging.getLogger(__name__)
DEFAULT_KNX_KEYRING_FILENAME: Final = "keyring.knxkeys"
async def save_uploaded_knxkeys_file(
hass: HomeAssistant, uploaded_file_id: str, password: str
) -> Keyring:
"""Validate the uploaded file and move it to the storage directory.
Return a Keyring object.
Raises InvalidSecureConfiguration if the file or password is invalid.
"""
def _process_upload() -> Keyring:
with process_uploaded_file(hass, uploaded_file_id) as file_path:
try:
keyring = sync_load_keyring(
path=file_path,
password=password,
)
except InvalidSecureConfiguration as err:
_LOGGER.debug(err)
raise
dest_path = Path(hass.config.path(STORAGE_DIR, DOMAIN))
dest_path.mkdir(exist_ok=True)
dest_file = dest_path / DEFAULT_KNX_KEYRING_FILENAME
shutil.move(file_path, dest_file)
return keyring
return await hass.async_add_executor_job(_process_upload)

View File

@ -71,9 +71,9 @@ def fixture_knx_setup():
def patch_file_upload(return_value=FIXTURE_KEYRING, side_effect=None):
"""Patch file upload. Yields the Keyring instance (return_value)."""
with patch(
"homeassistant.components.knx.config_flow.process_uploaded_file"
"homeassistant.components.knx.helpers.keyring.process_uploaded_file"
) as file_upload_mock, patch(
"homeassistant.components.knx.config_flow.sync_load_keyring",
"homeassistant.components.knx.helpers.keyring.sync_load_keyring",
return_value=return_value,
side_effect=side_effect,
), patch(