Compare commits

..

2 Commits

Author SHA1 Message Date
Stefan Agner
844308d755 Merge branch 'main' into handle-config-envs-none 2026-02-13 09:00:31 +01:00
Mike Degatano
844ce5b318 Handle nonetype error for envs on homeassistant stop 2026-02-12 22:00:21 +00:00
32 changed files with 319 additions and 652 deletions

View File

@@ -91,8 +91,8 @@ availability.
### Python Requirements
- **Compatibility**: Python 3.14+
- **Language Features**: Use modern Python features:
- **Compatibility**: Python 3.13+
- **Language Features**: Use modern Python features:
- Type hints with `typing` module
- f-strings (preferred over `%` or `.format()`)
- Dataclasses and enum classes

View File

@@ -5,53 +5,45 @@ categories:
- title: ":boom: Breaking Changes"
label: "breaking-change"
- title: ":wrench: Build"
label: "build"
- title: ":boar: Chore"
label: "chore"
- title: ":sparkles: New Features"
label: "new-feature"
- title: ":zap: Performance"
label: "performance"
- title: ":recycle: Refactor"
label: "refactor"
- title: ":green_heart: CI"
label: "ci"
- title: ":bug: Bug Fixes"
label: "bugfix"
- title: ":gem: Style"
label: "style"
- title: ":package: Refactor"
label: "refactor"
- title: ":rocket: Performance"
label: "performance"
- title: ":rotating_light: Test"
- title: ":white_check_mark: Test"
label: "test"
- title: ":hammer_and_wrench: Build"
label: "build"
- title: ":gear: CI"
label: "ci"
- title: ":recycle: Chore"
label: "chore"
- title: ":wastebasket: Revert"
label: "revert"
- title: ":arrow_up: Dependency Updates"
label: "dependencies"
collapse-after: 1
include-labels:
- "breaking-change"
- "build"
- "chore"
- "performance"
- "refactor"
- "new-feature"
- "bugfix"
- "style"
- "refactor"
- "performance"
- "test"
- "build"
- "ci"
- "chore"
- "revert"
- "dependencies"
- "test"
- "ci"
template: |

View File

@@ -33,7 +33,7 @@ on:
- setup.py
env:
DEFAULT_PYTHON: "3.14.3"
DEFAULT_PYTHON: "3.13"
COSIGN_VERSION: "v2.5.3"
CRANE_VERSION: "v0.20.7"
CRANE_SHA256: "8ef3564d264e6b5ca93f7b7f5652704c4dd29d33935aff6947dd5adefd05953e"
@@ -106,7 +106,7 @@ jobs:
- runs-on: ubuntu-24.04-arm
arch: aarch64
env:
WHEELS_ABI: cp314
WHEELS_ABI: cp313
WHEELS_TAG: musllinux_1_2
WHEELS_APK_DEPS: "libffi-dev;openssl-dev;yaml-dev"
WHEELS_SKIP_BINARY: aiohttp

View File

@@ -8,7 +8,7 @@ on:
pull_request: ~
env:
DEFAULT_PYTHON: "3.14.3"
DEFAULT_PYTHON: "3.13"
PRE_COMMIT_CACHE: ~/.cache/pre-commit
MYPY_CACHE_VERSION: 1

View File

@@ -9,7 +9,7 @@ jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@b5d41d4e1d5dceea10e7104786b73624c18a190f # v10.2.0
- uses: actions/stale@997185467fa4f803885201cee163a9f38240193d # v10.1.1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
days-before-stale: 30

View File

@@ -1,7 +1,7 @@
image: ghcr.io/home-assistant/{arch}-hassio-supervisor
build_from:
aarch64: ghcr.io/home-assistant/aarch64-base-python:3.14-alpine3.22-2026.02.0
amd64: ghcr.io/home-assistant/amd64-base-python:3.14-alpine3.22-2026.02.0
aarch64: ghcr.io/home-assistant/aarch64-base-python:3.13-alpine3.22-2025.12.2
amd64: ghcr.io/home-assistant/amd64-base-python:3.13-alpine3.22-2025.12.2
cosign:
base_identity: https://github.com/home-assistant/docker-base/.*
identity: https://github.com/home-assistant/supervisor/.*

View File

@@ -1,9 +1,10 @@
aiodns==4.0.0
aiodocker==0.26.0
aiodocker==0.25.0
aiohttp==3.13.3
atomicwrites-homeassistant==1.4.1
attrs==25.4.0
awesomeversion==25.8.0
backports.zstd==1.3.0
blockbuster==1.5.26
brotli==1.2.0
ciso8601==2.3.3
@@ -13,6 +14,7 @@ cryptography==46.0.5
debugpy==1.8.20
deepmerge==2.0
dirhash==0.5.0
docker==7.1.0
faust-cchardet==2.1.19
gitpython==3.1.46
jinja2==3.1.6
@@ -23,7 +25,7 @@ pyudev==0.24.4
PyYAML==6.0.3
requests==2.32.5
securetar==2025.12.0
sentry-sdk==2.53.0
sentry-sdk==2.52.0
setuptools==82.0.0
voluptuous==0.16.0
dbus-fast==4.0.0

View File

@@ -2,14 +2,15 @@ astroid==4.0.3
coverage==7.13.4
mypy==1.19.1
pre-commit==4.5.1
pylint==4.0.5
pylint==4.0.4
pytest-aiohttp==1.1.0
pytest-asyncio==1.3.0
pytest-cov==7.0.0
pytest-timeout==2.4.0
pytest==9.0.2
ruff==0.15.2
ruff==0.15.1
time-machine==3.2.0
types-docker==7.1.0.20260109
types-pyyaml==6.0.12.20250915
types-requests==2.32.4.20260107
urllib3==2.6.3

View File

@@ -20,7 +20,7 @@ from typing import Any, Final, cast
import aiohttp
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
from deepmerge import Merger
from securetar import AddFileError, SecureTarFile, atomic_contents_add
from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path
import voluptuous as vol
from voluptuous.humanize import humanize_error
@@ -76,7 +76,6 @@ from ..exceptions import (
AddonsError,
AddonsJobError,
AddonUnknownError,
BackupInvalidError,
BackupRestoreUnknownError,
ConfigurationFileError,
DockerBuildError,
@@ -1445,11 +1444,10 @@ class Addon(AddonModel):
tmp = TemporaryDirectory(dir=self.sys_config.path_tmp)
try:
with tar_file as backup:
# The tar filter rejects path traversal and absolute names,
# aborting restore of malicious backups with such exploits.
backup.extractall(
path=tmp.name,
filter="tar",
members=secure_path(backup),
filter="fully_trusted",
)
data = read_json_file(Path(tmp.name, "addon.json"))
@@ -1461,12 +1459,8 @@ class Addon(AddonModel):
try:
tmp, data = await self.sys_run_in_executor(_extract_tarfile)
except tarfile.FilterError as err:
raise BackupInvalidError(
f"Can't extract backup tarfile for {self.slug}: {err}",
_LOGGER.error,
) from err
except tarfile.TarError as err:
_LOGGER.error("Can't extract backup tarfile for %s: %s", self.slug, err)
raise BackupRestoreUnknownError() from err
except ConfigurationFileError as err:
raise AddonUnknownError(addon=self.slug) from err

View File

@@ -127,14 +127,14 @@ class APIAuth(CoreSysAttributes):
return {
ATTR_USERS: [
{
ATTR_USERNAME: user.username,
ATTR_NAME: user.name,
ATTR_IS_OWNER: user.is_owner,
ATTR_IS_ACTIVE: user.is_active,
ATTR_LOCAL_ONLY: user.local_only,
ATTR_GROUP_IDS: user.group_ids,
ATTR_USERNAME: user[ATTR_USERNAME],
ATTR_NAME: user[ATTR_NAME],
ATTR_IS_OWNER: user[ATTR_IS_OWNER],
ATTR_IS_ACTIVE: user[ATTR_IS_ACTIVE],
ATTR_LOCAL_ONLY: user[ATTR_LOCAL_ONLY],
ATTR_GROUP_IDS: user[ATTR_GROUP_IDS],
}
for user in await self.sys_auth.list_users()
if user.username
if user[ATTR_USERNAME]
]
}

View File

@@ -29,8 +29,8 @@ from ..const import (
HEADER_REMOTE_USER_NAME,
HEADER_TOKEN,
HEADER_TOKEN_OLD,
HomeAssistantUser,
IngressSessionData,
IngressSessionDataUser,
)
from ..coresys import CoreSysAttributes
from ..exceptions import HomeAssistantAPIError
@@ -75,6 +75,12 @@ def status_code_must_be_empty_body(code: int) -> bool:
class APIIngress(CoreSysAttributes):
"""Ingress view to handle add-on webui routing."""
_list_of_users: list[IngressSessionDataUser]
def __init__(self) -> None:
"""Initialize APIIngress."""
self._list_of_users = []
def _extract_addon(self, request: web.Request) -> Addon:
"""Return addon, throw an exception it it doesn't exist."""
token = request.match_info["token"]
@@ -300,15 +306,20 @@ class APIIngress(CoreSysAttributes):
return response
async def _find_user_by_id(self, user_id: str) -> HomeAssistantUser | None:
async def _find_user_by_id(self, user_id: str) -> IngressSessionDataUser | None:
"""Find user object by the user's ID."""
try:
users = await self.sys_homeassistant.list_users()
except HomeAssistantAPIError as err:
_LOGGER.warning("Could not fetch list of users: %s", err)
list_of_users = await self.sys_homeassistant.get_users()
except (HomeAssistantAPIError, TypeError) as err:
_LOGGER.error(
"%s error occurred while requesting list of users: %s", type(err), err
)
return None
return next((user for user in users if user.id == user_id), None)
if list_of_users is not None:
self._list_of_users = list_of_users
return next((user for user in self._list_of_users if user.id == user_id), None)
def _init_header(
@@ -321,8 +332,8 @@ def _init_header(
headers[HEADER_REMOTE_USER_ID] = session_data.user.id
if session_data.user.username is not None:
headers[HEADER_REMOTE_USER_NAME] = session_data.user.username
if session_data.user.name is not None:
headers[HEADER_REMOTE_USER_DISPLAY_NAME] = session_data.user.name
if session_data.user.display_name is not None:
headers[HEADER_REMOTE_USER_DISPLAY_NAME] = session_data.user.display_name
# filter flags
for name, value in request.headers.items():

View File

@@ -6,12 +6,13 @@ import logging
from typing import Any, TypedDict, cast
from .addons.addon import Addon
from .const import ATTR_PASSWORD, ATTR_USERNAME, FILE_HASSIO_AUTH, HomeAssistantUser
from .const import ATTR_PASSWORD, ATTR_TYPE, ATTR_USERNAME, FILE_HASSIO_AUTH
from .coresys import CoreSys, CoreSysAttributes
from .exceptions import (
AuthHomeAssistantAPIValidationError,
AuthInvalidNonStringValueError,
AuthListUsersError,
AuthListUsersNoneResponseError,
AuthPasswordResetError,
HomeAssistantAPIError,
HomeAssistantWSError,
@@ -156,14 +157,22 @@ class Auth(FileConfiguration, CoreSysAttributes):
raise AuthPasswordResetError(user=username)
async def list_users(self) -> list[HomeAssistantUser]:
async def list_users(self) -> list[dict[str, Any]]:
"""List users on the Home Assistant instance."""
try:
return await self.sys_homeassistant.list_users()
users: (
list[dict[str, Any]] | None
) = await self.sys_homeassistant.websocket.async_send_command(
{ATTR_TYPE: "config/auth/list"}
)
except HomeAssistantWSError as err:
_LOGGER.error("Can't request listing users on Home Assistant: %s", err)
raise AuthListUsersError() from err
if users is not None:
return users
raise AuthListUsersNoneResponseError(_LOGGER.error)
@staticmethod
def _rehash(value: str, salt2: str = "") -> str:
"""Rehash a value."""

View File

@@ -18,7 +18,7 @@ import time
from typing import Any, Self, cast
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
from securetar import AddFileError, SecureTarFile, atomic_contents_add
from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path
import voluptuous as vol
from voluptuous.humanize import humanize_error
@@ -512,24 +512,12 @@ class Backup(JobGroup):
)
tmp = TemporaryDirectory(dir=str(backup_tarfile.parent))
try:
with tarfile.open(backup_tarfile, "r:") as tar:
# The tar filter rejects path traversal and absolute names,
# aborting restore of potentially crafted backups.
tar.extractall(
path=tmp.name,
filter="tar",
)
except tarfile.FilterError as err:
raise BackupInvalidError(
f"Can't read backup tarfile {backup_tarfile.as_posix()}: {err}",
_LOGGER.error,
) from err
except tarfile.TarError as err:
raise BackupError(
f"Can't read backup tarfile {backup_tarfile.as_posix()}: {err}",
_LOGGER.error,
) from err
with tarfile.open(backup_tarfile, "r:") as tar:
tar.extractall(
path=tmp.name,
members=secure_path(tar),
filter="fully_trusted",
)
return tmp
@@ -810,17 +798,10 @@ class Backup(JobGroup):
bufsize=BUF_SIZE,
password=self._password,
) as tar_file:
# The tar filter rejects path traversal and absolute names,
# aborting restore of potentially crafted backups.
tar_file.extractall(
path=origin_dir,
filter="tar",
path=origin_dir, members=tar_file, filter="fully_trusted"
)
_LOGGER.info("Restore folder %s done", name)
except tarfile.FilterError as err:
raise BackupInvalidError(
f"Can't restore folder {name}: {err}", _LOGGER.warning
) from err
except (tarfile.TarError, OSError) as err:
raise BackupError(
f"Can't restore folder {name}: {err}", _LOGGER.warning

View File

@@ -1,12 +1,11 @@
"""Constants file for Supervisor."""
from collections.abc import Mapping
from dataclasses import dataclass
from enum import StrEnum
from ipaddress import IPv4Network, IPv6Network
from pathlib import Path
from sys import version_info as systemversion
from typing import Any, NotRequired, Self, TypedDict
from typing import NotRequired, Self, TypedDict
from aiohttp import __version__ as aiohttpversion
@@ -537,77 +536,60 @@ class CpuArch(StrEnum):
AMD64 = "amd64"
@dataclass
class HomeAssistantUser:
"""A Home Assistant Core user.
Incomplete model — Core's User object has additional fields
(credentials, refresh_tokens, etc.) that are not represented here.
Only fields used by the Supervisor are included.
"""
id: str
username: str | None = None
name: str | None = None
is_owner: bool = False
is_active: bool = False
local_only: bool = False
system_generated: bool = False
group_ids: list[str] | None = None
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
"""Return object from dictionary representation."""
return cls(
id=data["id"],
username=data.get("username"),
# "displayname" is a legacy key from old ingress session data
name=data.get("name") or data.get("displayname"),
is_owner=data.get("is_owner", False),
is_active=data.get("is_active", False),
local_only=data.get("local_only", False),
system_generated=data.get("system_generated", False),
group_ids=data.get("group_ids"),
)
class IngressSessionDataUserDict(TypedDict):
"""Serialization format for user data stored in ingress sessions.
Legacy data may contain "displayname" instead of "name".
"""
"""Response object for ingress session user."""
id: str
username: NotRequired[str | None]
# Name is an alias for displayname, only one should be used
displayname: NotRequired[str | None]
name: NotRequired[str | None]
@dataclass
class IngressSessionDataUser:
"""Format of an IngressSessionDataUser object."""
id: str
display_name: str | None = None
username: str | None = None
def to_dict(self) -> IngressSessionDataUserDict:
"""Get dictionary representation."""
return IngressSessionDataUserDict(
id=self.id, displayname=self.display_name, username=self.username
)
@classmethod
def from_dict(cls, data: IngressSessionDataUserDict) -> Self:
"""Return object from dictionary representation."""
return cls(
id=data["id"],
display_name=data.get("displayname") or data.get("name"),
username=data.get("username"),
)
class IngressSessionDataDict(TypedDict):
"""Serialization format for ingress session data."""
"""Response object for ingress session data."""
user: IngressSessionDataUserDict
@dataclass
class IngressSessionData:
"""Ingress session data attached to a session token."""
"""Format of an IngressSessionData object."""
user: HomeAssistantUser
user: IngressSessionDataUser
def to_dict(self) -> IngressSessionDataDict:
"""Get dictionary representation."""
return IngressSessionDataDict(
user=IngressSessionDataUserDict(
id=self.user.id,
name=self.user.name,
username=self.user.username,
)
)
return IngressSessionDataDict(user=self.user.to_dict())
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
def from_dict(cls, data: IngressSessionDataDict) -> Self:
"""Return object from dictionary representation."""
return cls(user=HomeAssistantUser.from_dict(data["user"]))
return cls(user=IngressSessionDataUser.from_dict(data["user"]))
STARTING_STATES = [

View File

@@ -306,11 +306,9 @@ class DeviceType(DBusIntEnum):
WIRELESS = 2
BLUETOOTH = 5
VLAN = 11
BRIDGE = 13
TUN = 16
VETH = 20
WIREGUARD = 29
WIFI_P2P = 30
LOOPBACK = 32

View File

@@ -64,8 +64,8 @@ class DockerHomeAssistant(DockerInterface):
"""Return timeout for Docker actions."""
# Use S6_SERVICES_GRACETIME to avoid killing Home Assistant Core, see
# https://github.com/home-assistant/core/tree/dev/Dockerfile
if self.meta_config and "Env" in self.meta_config:
for env in self.meta_config["Env"]:
if self.meta_config and (envs := self.meta_config.get("Env")):
for env in envs:
if match := ENV_S6_GRACETIME.match(env):
return 20 + int(int(match.group(1)) / 1000)

View File

@@ -7,6 +7,7 @@ from collections.abc import Mapping
from contextlib import suppress
from dataclasses import dataclass
import errno
from functools import partial
from http import HTTPStatus
from io import BufferedReader, BufferedWriter
from ipaddress import IPv4Address
@@ -24,6 +25,8 @@ from aiodocker.stream import Stream
from aiodocker.types import JSONObject
from aiohttp import ClientTimeout, UnixConnector
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
from docker import errors as docker_errors
from docker.client import DockerClient
import requests
from ..const import (
@@ -267,6 +270,8 @@ class DockerAPI(CoreSysAttributes):
def __init__(self, coresys: CoreSys):
"""Initialize Docker base wrapper."""
self.coresys = coresys
# We keep both until we can fully refactor to aiodocker
self._dockerpy: DockerClient | None = None
self.docker: aiodocker.Docker = aiodocker.Docker(
url="unix://localhost", # dummy hostname for URL composition
connector=UnixConnector(SOCKET_DOCKER.as_posix()),
@@ -284,6 +289,15 @@ class DockerAPI(CoreSysAttributes):
async def post_init(self) -> Self:
"""Post init actions that must be done in event loop."""
self._dockerpy = await asyncio.get_running_loop().run_in_executor(
None,
partial(
DockerClient,
base_url=f"unix:/{SOCKET_DOCKER.as_posix()}",
version="auto",
timeout=900,
),
)
self._info = await DockerInfo.new(await self.docker.system.info())
await self.config.read_data()
self._network = await DockerNetwork(self.docker).post_init(
@@ -291,6 +305,13 @@ class DockerAPI(CoreSysAttributes):
)
return self
@property
def dockerpy(self) -> DockerClient:
"""Get docker API client."""
if not self._dockerpy:
raise RuntimeError("Docker API Client not initialized!")
return self._dockerpy
@property
def network(self) -> DockerNetwork:
"""Get Docker network."""
@@ -704,40 +725,43 @@ class DockerAPI(CoreSysAttributes):
async def repair(self) -> None:
"""Repair local docker overlayfs2 issues."""
_LOGGER.info("Prune stale containers")
try:
output = await self.docker.containers.prune()
_LOGGER.debug("Containers prune: %s", output)
except aiodocker.DockerError as err:
_LOGGER.warning("Error for containers prune: %s", err)
def repair_docker_blocking():
_LOGGER.info("Prune stale containers")
try:
output = self.dockerpy.api.prune_containers()
_LOGGER.debug("Containers prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for containers prune: %s", err)
_LOGGER.info("Prune stale images")
try:
output = await self.images.prune(filters={"dangling": "false"})
_LOGGER.debug("Images prune: %s", output)
except aiodocker.DockerError as err:
_LOGGER.warning("Error for images prune: %s", err)
_LOGGER.info("Prune stale images")
try:
output = self.dockerpy.api.prune_images(filters={"dangling": False})
_LOGGER.debug("Images prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for images prune: %s", err)
_LOGGER.info("Prune stale builds")
try:
output = await self.images.prune_builds()
_LOGGER.debug("Builds prune: %s", output)
except aiodocker.DockerError as err:
_LOGGER.warning("Error for builds prune: %s", err)
_LOGGER.info("Prune stale builds")
try:
output = self.dockerpy.api.prune_builds()
_LOGGER.debug("Builds prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for builds prune: %s", err)
_LOGGER.info("Prune stale volumes")
try:
output = await self.docker.volumes.prune()
_LOGGER.debug("Volumes prune: %s", output)
except aiodocker.DockerError as err:
_LOGGER.warning("Error for volumes prune: %s", err)
_LOGGER.info("Prune stale volumes")
try:
output = self.dockerpy.api.prune_volumes()
_LOGGER.debug("Volumes prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for volumes prune: %s", err)
_LOGGER.info("Prune stale networks")
try:
output = await self.docker.networks.prune()
_LOGGER.debug("Networks prune: %s", output)
except aiodocker.DockerError as err:
_LOGGER.warning("Error for networks prune: %s", err)
_LOGGER.info("Prune stale networks")
try:
output = self.dockerpy.api.prune_networks()
_LOGGER.debug("Networks prune: %s", output)
except docker_errors.APIError as err:
_LOGGER.warning("Error for networks prune: %s", err)
await self.sys_run_in_executor(repair_docker_blocking)
_LOGGER.info("Fix stale container on hassio network")
try:

View File

@@ -620,6 +620,18 @@ class AuthListUsersError(AuthError, APIUnknownSupervisorError):
message_template = "Can't request listing users on Home Assistant"
class AuthListUsersNoneResponseError(AuthError, APIInternalServerError):
"""Auth error if listing users returned invalid None response."""
error_key = "auth_list_users_none_response_error"
message_template = "Home Assistant returned invalid response of `{none}` instead of a list of users. Check Home Assistant logs for details (check with `{logs_command}`)"
extra_fields = {"none": "None", "logs_command": "ha core logs"}
def __init__(self, logger: Callable[..., None] | None = None) -> None:
"""Initialize exception."""
super().__init__(None, logger)
class AuthInvalidNonStringValueError(AuthError, APIUnauthorized):
"""Auth error if something besides a string provided as username or password."""

View File

@@ -182,53 +182,28 @@ class HomeAssistantCore(JobGroup):
concurrency=JobConcurrency.GROUP_REJECT,
)
async def install(self) -> None:
"""Install Home Assistant Core."""
"""Install a landing page."""
_LOGGER.info("Home Assistant setup")
stop_progress_log = asyncio.Event()
while True:
# read homeassistant tag and install it
if not self.sys_homeassistant.latest_version:
await self.sys_updater.reload()
async def _periodic_progress_log() -> None:
"""Log installation progress periodically for user visibility."""
while not stop_progress_log.is_set():
if to_version := self.sys_homeassistant.latest_version:
try:
await asyncio.wait_for(stop_progress_log.wait(), timeout=15)
except TimeoutError:
if (job := self.instance.active_job) and job.progress:
_LOGGER.info(
"Downloading Home Assistant Core image, %d%%",
int(job.progress),
)
else:
_LOGGER.info("Home Assistant Core installation in progress")
await self.instance.update(
to_version,
image=self.sys_updater.image_homeassistant,
)
self.sys_homeassistant.version = self.instance.version or to_version
break
except (DockerError, JobException):
pass
except Exception as err: # pylint: disable=broad-except
await async_capture_exception(err)
progress_task = self.sys_create_task(_periodic_progress_log())
try:
while True:
# read homeassistant tag and install it
if not self.sys_homeassistant.latest_version:
await self.sys_updater.reload()
if to_version := self.sys_homeassistant.latest_version:
try:
await self.instance.update(
to_version,
image=self.sys_updater.image_homeassistant,
)
self.sys_homeassistant.version = (
self.instance.version or to_version
)
break
except (DockerError, JobException):
pass
except Exception as err: # pylint: disable=broad-except
await async_capture_exception(err)
_LOGGER.warning(
"Error on Home Assistant installation. Retrying in 30sec"
)
await asyncio.sleep(30)
finally:
stop_progress_log.set()
await progress_task
_LOGGER.warning("Error on Home Assistant installation. Retrying in 30sec")
await asyncio.sleep(30)
_LOGGER.info("Home Assistant docker now installed")
self.sys_homeassistant.set_image(self.sys_updater.image_homeassistant)

View File

@@ -1,6 +1,7 @@
"""Home Assistant control object."""
import asyncio
from datetime import timedelta
import errno
from ipaddress import IPv4Address
import logging
@@ -12,7 +13,7 @@ from typing import Any
from uuid import UUID
from awesomeversion import AwesomeVersion, AwesomeVersionException
from securetar import AddFileError, SecureTarFile, atomic_contents_add
from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path
import voluptuous as vol
from voluptuous.humanize import humanize_error
@@ -34,11 +35,11 @@ from ..const import (
ATTR_WATCHDOG,
FILE_HASSIO_HOMEASSISTANT,
BusEvent,
HomeAssistantUser,
IngressSessionDataUser,
IngressSessionDataUserDict,
)
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import (
BackupInvalidError,
ConfigurationFileError,
HomeAssistantBackupError,
HomeAssistantError,
@@ -46,6 +47,7 @@ from ..exceptions import (
)
from ..hardware.const import PolicyGroup
from ..hardware.data import Device
from ..jobs.const import JobConcurrency, JobThrottle
from ..jobs.decorator import Job
from ..resolution.const import UnhealthyReason
from ..utils import remove_folder, remove_folder_with_excludes
@@ -493,16 +495,11 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
# extract backup
try:
with tar_file as backup:
# The tar filter rejects path traversal and absolute names,
# aborting restore of potentially crafted backups.
backup.extractall(
path=temp_path,
filter="tar",
members=secure_path(backup),
filter="fully_trusted",
)
except tarfile.FilterError as err:
raise BackupInvalidError(
f"Invalid tarfile {tar_file}: {err}", _LOGGER.error
) from err
except tarfile.TarError as err:
raise HomeAssistantError(
f"Can't read tarfile {tar_file}: {err}", _LOGGER.error
@@ -573,12 +570,21 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
if attr in data:
self._data[attr] = data[attr]
async def list_users(self) -> list[HomeAssistantUser]:
"""Fetch list of all users from Home Assistant Core via WebSocket.
Raises HomeAssistantWSError on WebSocket connection/communication failure.
"""
raw: list[dict[str, Any]] = await self.websocket.async_send_command(
@Job(
name="home_assistant_get_users",
throttle_period=timedelta(minutes=5),
internal=True,
concurrency=JobConcurrency.QUEUE,
throttle=JobThrottle.THROTTLE,
)
async def get_users(self) -> list[IngressSessionDataUser]:
"""Get list of all configured users."""
list_of_users: (
list[IngressSessionDataUserDict] | None
) = await self.sys_homeassistant.websocket.async_send_command(
{ATTR_TYPE: "config/auth/list"}
)
return [HomeAssistantUser.from_dict(data) for data in raw]
if list_of_users:
return [IngressSessionDataUser.from_dict(data) for data in list_of_users]
return []

View File

@@ -65,7 +65,7 @@ class WSClient:
if not self._client.closed:
await self._client.close()
async def async_send_command(self, message: dict[str, Any]) -> T:
async def async_send_command(self, message: dict[str, Any]) -> T | None:
"""Send a websocket message, and return the response."""
self._message_id += 1
message["id"] = self._message_id
@@ -146,7 +146,7 @@ class WSClient:
try:
client = await session.ws_connect(url, ssl=False)
except aiohttp.client_exceptions.ClientConnectorError:
raise HomeAssistantWSConnectionError("Can't connect") from None
raise HomeAssistantWSError("Can't connect") from None
hello_message = await client.receive_json()
@@ -200,11 +200,10 @@ class HomeAssistantWebSocket(CoreSysAttributes):
async def _ensure_connected(self) -> None:
"""Ensure WebSocket connection is ready.
Raises HomeAssistantWSConnectionError if unable to connect.
Raises HomeAssistantAuthError if authentication with Core fails.
Raises HomeAssistantWSError if unable to connect.
"""
if self.sys_core.state in CLOSING_STATES:
raise HomeAssistantWSConnectionError(
raise HomeAssistantWSError(
"WebSocket not available, system is shutting down"
)
@@ -212,7 +211,7 @@ class HomeAssistantWebSocket(CoreSysAttributes):
# If we are already connected, we can avoid the check_api_state call
# since it makes a new socket connection and we already have one.
if not connected and not await self.sys_homeassistant.api.check_api_state():
raise HomeAssistantWSConnectionError(
raise HomeAssistantWSError(
"Can't connect to Home Assistant Core WebSocket, the API is not reachable"
)
@@ -252,10 +251,10 @@ class HomeAssistantWebSocket(CoreSysAttributes):
await self._client.close()
self._client = None
async def async_send_command(self, message: dict[str, Any]) -> T:
async def async_send_command(self, message: dict[str, Any]) -> T | None:
"""Send a command and return the response.
Raises HomeAssistantWSError on WebSocket connection or communication failure.
Raises HomeAssistantWSError if unable to connect to Home Assistant Core.
"""
await self._ensure_connected()
# _ensure_connected guarantees self._client is set

View File

@@ -1,11 +1,11 @@
{
"local": {
"name": "Local apps",
"name": "Local add-ons",
"url": "https://home-assistant.io/hassio",
"maintainer": "you"
},
"core": {
"name": "Official apps",
"name": "Official add-ons",
"url": "https://home-assistant.io/addons",
"maintainer": "Home Assistant"
}

View File

@@ -31,7 +31,6 @@ from .const import (
ATTR_LOGGING,
ATTR_MTU,
ATTR_MULTICAST,
ATTR_NAME,
ATTR_OBSERVER,
ATTR_OTA,
ATTR_PASSWORD,
@@ -207,9 +206,7 @@ SCHEMA_SESSION_DATA = vol.Schema(
{
vol.Required(ATTR_ID): str,
vol.Required(ATTR_USERNAME, default=None): vol.Maybe(str),
vol.Required(ATTR_NAME, default=None): vol.Maybe(str),
# Legacy key, replaced by ATTR_NAME
vol.Optional(ATTR_DISPLAYNAME): vol.Maybe(str),
vol.Required(ATTR_DISPLAYNAME, default=None): vol.Maybe(str),
}
)
}

View File

@@ -1,6 +1,7 @@
"""Test auth API."""
from datetime import UTC, datetime, timedelta
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
from aiohttp.hdrs import WWW_AUTHENTICATE
@@ -168,25 +169,46 @@ async def test_list_users(
]
async def test_list_users_ws_error(
@pytest.mark.parametrize(
("send_command_mock", "error_response", "expected_log"),
[
(
AsyncMock(return_value=None),
{
"result": "error",
"message": "Home Assistant returned invalid response of `None` instead of a list of users. Check Home Assistant logs for details (check with `ha core logs`)",
"error_key": "auth_list_users_none_response_error",
"extra_fields": {"none": "None", "logs_command": "ha core logs"},
},
"Home Assistant returned invalid response of `None` instead of a list of users. Check Home Assistant logs for details (check with `ha core logs`)",
),
(
AsyncMock(side_effect=HomeAssistantWSError("fail")),
{
"result": "error",
"message": "Can't request listing users on Home Assistant. Check supervisor logs for details (check with 'ha supervisor logs')",
"error_key": "auth_list_users_error",
"extra_fields": {"logs_command": "ha supervisor logs"},
},
"Can't request listing users on Home Assistant: fail",
),
],
)
async def test_list_users_failure(
api_client: TestClient,
ha_ws_client: AsyncMock,
caplog: pytest.LogCaptureFixture,
send_command_mock: AsyncMock,
error_response: dict[str, Any],
expected_log: str,
):
"""Test WS error when listing users via API."""
ha_ws_client.async_send_command = AsyncMock(
side_effect=HomeAssistantWSError("fail")
)
"""Test failure listing users via API."""
ha_ws_client.async_send_command = send_command_mock
resp = await api_client.get("/auth/list")
assert resp.status == 500
result = await resp.json()
assert result == {
"result": "error",
"message": "Can't request listing users on Home Assistant. Check supervisor logs for details (check with 'ha supervisor logs')",
"error_key": "auth_list_users_error",
"extra_fields": {"logs_command": "ha supervisor logs"},
}
assert "Can't request listing users on Home Assistant: fail" in caplog.text
assert result == error_response
assert expected_log in caplog.text
@pytest.mark.parametrize(

View File

@@ -99,7 +99,9 @@ async def test_validate_session_with_user_id(
assert session in coresys.ingress.sessions_data
assert coresys.ingress.get_session_data(session).user.id == "some-id"
assert coresys.ingress.get_session_data(session).user.username == "sn"
assert coresys.ingress.get_session_data(session).user.name == "Some Name"
assert (
coresys.ingress.get_session_data(session).user.display_name == "Some Name"
)
async def test_ingress_proxy_no_content_type_for_empty_body_responses(

View File

@@ -1,257 +0,0 @@
"""Security tests for backup tar extraction with tar filter."""
import io
from pathlib import Path
import tarfile
import pytest
from securetar import SecureTarFile
from supervisor.addons.addon import Addon
from supervisor.backups.backup import Backup
from supervisor.backups.const import BackupType
from supervisor.coresys import CoreSys
from supervisor.exceptions import BackupInvalidError
def _create_tar_gz(
path: Path,
members: list[tarfile.TarInfo],
file_data: dict[str, bytes] | None = None,
) -> None:
"""Create a tar.gz file with specified members."""
if file_data is None:
file_data = {}
with tarfile.open(path, "w:gz") as tar:
for info in members:
data = file_data.get(info.name)
if data is not None:
tar.addfile(info, io.BytesIO(data))
else:
tar.addfile(info)
def test_path_traversal_rejected(tmp_path: Path):
"""Test that path traversal in member names is rejected."""
traversal_info = tarfile.TarInfo(name="../../etc/passwd")
traversal_info.size = 9
tar_path = tmp_path / "test.tar.gz"
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
dest = tmp_path / "out"
dest.mkdir()
with (
tarfile.open(tar_path, "r:gz") as tar,
pytest.raises(tarfile.OutsideDestinationError),
):
tar.extractall(path=dest, filter="tar")
def test_symlink_write_through_rejected(tmp_path: Path):
"""Test that writing through a symlink to outside destination is rejected.
The tar filter's realpath check follows already-extracted symlinks on disk,
catching write-through attacks even without explicit link target validation.
"""
# Symlink pointing outside, then a file entry writing through it
link_info = tarfile.TarInfo(name="escape")
link_info.type = tarfile.SYMTYPE
link_info.linkname = "../outside"
file_info = tarfile.TarInfo(name="escape/evil.py")
file_info.size = 9
tar_path = tmp_path / "test.tar.gz"
_create_tar_gz(
tar_path,
[link_info, file_info],
{"escape/evil.py": b"malicious"},
)
dest = tmp_path / "out"
dest.mkdir()
with (
tarfile.open(tar_path, "r:gz") as tar,
pytest.raises(tarfile.OutsideDestinationError),
):
tar.extractall(path=dest, filter="tar")
# The evil file must not exist outside the destination
assert not (tmp_path / "outside" / "evil.py").exists()
def test_absolute_name_stripped_and_extracted(tmp_path: Path):
"""Test that absolute member names have leading / stripped and extract safely."""
info = tarfile.TarInfo(name="/etc/test.conf")
info.size = 5
tar_path = tmp_path / "test.tar.gz"
_create_tar_gz(tar_path, [info], {"/etc/test.conf": b"hello"})
dest = tmp_path / "out"
dest.mkdir()
with tarfile.open(tar_path, "r:gz") as tar:
tar.extractall(path=dest, filter="tar")
# Extracted inside destination with leading / stripped
assert (dest / "etc" / "test.conf").read_text() == "hello"
def test_valid_backup_with_internal_symlinks(tmp_path: Path):
"""Test that valid backups with internal relative symlinks extract correctly."""
dir_info = tarfile.TarInfo(name="subdir")
dir_info.type = tarfile.DIRTYPE
dir_info.mode = 0o755
file_info = tarfile.TarInfo(name="subdir/config.yaml")
file_info.size = 11
link_info = tarfile.TarInfo(name="config_link")
link_info.type = tarfile.SYMTYPE
link_info.linkname = "subdir/config.yaml"
tar_path = tmp_path / "test.tar.gz"
_create_tar_gz(
tar_path,
[dir_info, file_info, link_info],
{"subdir/config.yaml": b"key: value\n"},
)
dest = tmp_path / "out"
dest.mkdir()
with tarfile.open(tar_path, "r:gz") as tar:
tar.extractall(path=dest, filter="tar")
assert (dest / "subdir" / "config.yaml").read_text() == "key: value\n"
assert (dest / "config_link").is_symlink()
assert (dest / "config_link").read_text() == "key: value\n"
def test_uid_gid_preserved(tmp_path: Path):
"""Test that tar filter preserves file ownership."""
info = tarfile.TarInfo(name="owned_file.txt")
info.size = 5
info.uid = 1000
info.gid = 1000
tar_path = tmp_path / "test.tar.gz"
_create_tar_gz(tar_path, [info], {"owned_file.txt": b"hello"})
dest = tmp_path / "out"
dest.mkdir()
with tarfile.open(tar_path, "r:gz") as tar:
# Extract member via filter only (don't actually extract, just check
# the filter preserves uid/gid)
for member in tar:
filtered = tarfile.tar_filter(member, str(dest))
assert filtered.uid == 1000
assert filtered.gid == 1000
async def test_backup_open_rejects_path_traversal(coresys: CoreSys, tmp_path: Path):
"""Test that Backup.open() raises BackupInvalidError for path traversal."""
tar_path = tmp_path / "malicious.tar"
traversal_info = tarfile.TarInfo(name="../../etc/passwd")
traversal_info.size = 9
with tarfile.open(tar_path, "w:") as tar:
tar.addfile(traversal_info, io.BytesIO(b"malicious"))
backup = Backup(coresys, tar_path, "test", None)
with pytest.raises(BackupInvalidError):
async with backup.open(None):
pass
async def test_homeassistant_restore_rejects_path_traversal(
coresys: CoreSys, tmp_supervisor_data: Path
):
"""Test that Home Assistant restore raises BackupInvalidError for path traversal."""
tar_path = tmp_supervisor_data / "homeassistant.tar.gz"
traversal_info = tarfile.TarInfo(name="../../etc/passwd")
traversal_info.size = 9
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
tar_file = SecureTarFile(tar_path, "r", gzip=True)
with pytest.raises(BackupInvalidError):
await coresys.homeassistant.restore(tar_file)
async def test_addon_restore_rejects_path_traversal(
coresys: CoreSys, install_addon_ssh: Addon, tmp_supervisor_data: Path
):
"""Test that add-on restore raises BackupInvalidError for path traversal."""
tar_path = tmp_supervisor_data / "addon.tar.gz"
traversal_info = tarfile.TarInfo(name="../../etc/passwd")
traversal_info.size = 9
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
tar_file = SecureTarFile(tar_path, "r", gzip=True)
with pytest.raises(BackupInvalidError):
await install_addon_ssh.restore(tar_file)
async def test_addon_restore_rejects_symlink_escape(
coresys: CoreSys, install_addon_ssh: Addon, tmp_supervisor_data: Path
):
"""Test that add-on restore raises BackupInvalidError for symlink escape."""
link_info = tarfile.TarInfo(name="escape")
link_info.type = tarfile.SYMTYPE
link_info.linkname = "../outside"
file_info = tarfile.TarInfo(name="escape/evil.py")
file_info.size = 9
tar_path = tmp_supervisor_data / "addon.tar.gz"
_create_tar_gz(
tar_path,
[link_info, file_info],
{"escape/evil.py": b"malicious"},
)
tar_file = SecureTarFile(tar_path, "r", gzip=True)
with pytest.raises(BackupInvalidError):
await install_addon_ssh.restore(tar_file)
async def test_folder_restore_rejects_path_traversal(
coresys: CoreSys, tmp_supervisor_data: Path
):
"""Test that folder restore rejects path traversal in backup tar."""
traversal_info = tarfile.TarInfo(name="../../etc/passwd")
traversal_info.size = 9
# Create backup with a malicious share folder tar inside
backup_tar_path = tmp_supervisor_data / "backup.tar"
with tarfile.open(backup_tar_path, "w:") as outer_tar:
share_tar_path = tmp_supervisor_data / "share.tar.gz"
_create_tar_gz(
share_tar_path, [traversal_info], {"../../etc/passwd": b"malicious"}
)
outer_tar.add(share_tar_path, arcname="./share.tar.gz")
backup = Backup(coresys, backup_tar_path, "test", None)
backup.new("test", "2025-01-01", BackupType.PARTIAL, compressed=True)
async with backup.open(None):
assert await backup.restore_folders(["share"]) is False
async def test_folder_restore_rejects_symlink_escape(
coresys: CoreSys, tmp_supervisor_data: Path
):
"""Test that folder restore rejects symlink escape in backup tar."""
link_info = tarfile.TarInfo(name="escape")
link_info.type = tarfile.SYMTYPE
link_info.linkname = "../outside"
file_info = tarfile.TarInfo(name="escape/evil.py")
file_info.size = 9
# Create backup with a malicious share folder tar inside
backup_tar_path = tmp_supervisor_data / "backup.tar"
with tarfile.open(backup_tar_path, "w:") as outer_tar:
share_tar_path = tmp_supervisor_data / "share.tar.gz"
_create_tar_gz(
share_tar_path,
[link_info, file_info],
{"escape/evil.py": b"malicious"},
)
outer_tar.add(share_tar_path, arcname="./share.tar.gz")
backup = Backup(coresys, backup_tar_path, "test", None)
backup.new("test", "2025-01-01", BackupType.PARTIAL, compressed=True)
async with backup.open(None):
assert await backup.restore_folders(["share"]) is False

View File

@@ -15,7 +15,6 @@ from aiodocker.events import DockerEvents
from aiodocker.execs import Exec
from aiodocker.networks import DockerNetwork, DockerNetworks
from aiodocker.system import DockerSystem
from aiodocker.volumes import DockerVolumes
from aiohttp import ClientSession, web
from aiohttp.test_utils import TestClient
from awesomeversion import AwesomeVersion
@@ -161,6 +160,7 @@ async def docker() -> DockerAPI:
}
with (
patch("supervisor.docker.manager.DockerClient", return_value=MagicMock()),
patch(
"supervisor.docker.manager.aiodocker.Docker",
return_value=(
@@ -170,7 +170,6 @@ async def docker() -> DockerAPI:
containers=(docker_containers := MagicMock(spec=DockerContainers)),
events=(docker_events := MagicMock(spec=DockerEvents)),
system=(docker_system := MagicMock(spec=DockerSystem)),
volumes=MagicMock(spec=DockerVolumes),
)
),
),

View File

@@ -9,6 +9,7 @@ import aiodocker
from aiodocker.containers import DockerContainer
from aiodocker.networks import DockerNetwork
from awesomeversion import AwesomeVersion
from docker.errors import APIError
import pytest
from supervisor.const import DNS_SUFFIX, ENV_SUPERVISOR_CPU_RT
@@ -183,6 +184,14 @@ async def test_run_command_custom_stdout_stderr(
async def test_run_command_with_mounts(docker: DockerAPI):
"""Test command execution with mounts are correctly converted."""
# Mock container and its methods
mock_container = MagicMock()
mock_container.wait.return_value = {"StatusCode": 0}
mock_container.logs.return_value = ["output"]
# Mock docker containers.run to return our mock container
docker.dockerpy.containers.run.return_value = mock_container
# Create test mounts
mounts = [
DockerMount(
@@ -447,13 +456,13 @@ async def test_repair(
await coresys.docker.repair()
coresys.docker.docker.containers.prune.assert_called_once()
coresys.docker.docker.images.prune.assert_called_once_with(
filters={"dangling": "false"}
coresys.docker.dockerpy.api.prune_containers.assert_called_once()
coresys.docker.dockerpy.api.prune_images.assert_called_once_with(
filters={"dangling": False}
)
coresys.docker.docker.images.prune_builds.assert_called_once()
coresys.docker.docker.volumes.prune.assert_called_once()
coresys.docker.docker.networks.prune.assert_called_once()
coresys.docker.dockerpy.api.prune_builds.assert_called_once()
coresys.docker.dockerpy.api.prune_volumes.assert_called_once()
coresys.docker.dockerpy.api.prune_networks.assert_called_once()
hassio.disconnect.assert_called_once_with({"Container": "corrupt", "Force": True})
host.disconnect.assert_not_called()
assert "Docker fatal error on container fail on hassio" in caplog.text
@@ -461,27 +470,24 @@ async def test_repair(
async def test_repair_failures(coresys: CoreSys, caplog: pytest.LogCaptureFixture):
"""Test repair proceeds best it can through failures."""
fail_err = aiodocker.DockerError(
HTTPStatus.INTERNAL_SERVER_ERROR, {"message": "fail"}
)
coresys.docker.docker.containers.prune.side_effect = fail_err
coresys.docker.docker.images.prune.side_effect = fail_err
coresys.docker.docker.images.prune_builds.side_effect = fail_err
coresys.docker.docker.volumes.prune.side_effect = fail_err
coresys.docker.docker.networks.prune.side_effect = fail_err
coresys.docker.docker.networks.get.side_effect = missing_err = (
aiodocker.DockerError(HTTPStatus.NOT_FOUND, {"message": "missing"})
coresys.docker.dockerpy.api.prune_containers.side_effect = APIError("fail")
coresys.docker.dockerpy.api.prune_images.side_effect = APIError("fail")
coresys.docker.dockerpy.api.prune_builds.side_effect = APIError("fail")
coresys.docker.dockerpy.api.prune_volumes.side_effect = APIError("fail")
coresys.docker.dockerpy.api.prune_networks.side_effect = APIError("fail")
coresys.docker.docker.networks.get.side_effect = err = aiodocker.DockerError(
HTTPStatus.NOT_FOUND, {"message": "missing"}
)
await coresys.docker.repair()
assert f"Error for containers prune: {fail_err!s}" in caplog.text
assert f"Error for images prune: {fail_err!s}" in caplog.text
assert f"Error for builds prune: {fail_err!s}" in caplog.text
assert f"Error for volumes prune: {fail_err!s}" in caplog.text
assert f"Error for networks prune: {fail_err!s}" in caplog.text
assert f"Error for networks hassio prune: {missing_err!s}" in caplog.text
assert f"Error for networks host prune: {missing_err!s}" in caplog.text
assert "Error for containers prune: fail" in caplog.text
assert "Error for images prune: fail" in caplog.text
assert "Error for builds prune: fail" in caplog.text
assert "Error for volumes prune: fail" in caplog.text
assert "Error for networks prune: fail" in caplog.text
assert f"Error for networks hassio prune: {err!s}" in caplog.text
assert f"Error for networks host prune: {err!s}" in caplog.text
@pytest.mark.parametrize("log_starter", [("Loaded image ID"), ("Loaded image")])

View File

@@ -1,6 +1,5 @@
"""Test Home Assistant core."""
import asyncio
from datetime import datetime, timedelta
from http import HTTPStatus
from unittest.mock import ANY, MagicMock, Mock, PropertyMock, call, patch
@@ -207,58 +206,6 @@ async def test_install_other_error(
assert "Unhandled exception:" not in caplog.text
@pytest.mark.parametrize(
("active_job", "expected_log"),
[
(None, "Home Assistant Core installation in progress"),
(MagicMock(progress=45.0), "Downloading Home Assistant Core image, 45%"),
],
)
async def test_install_logs_progress_periodically(
coresys: CoreSys,
caplog: pytest.LogCaptureFixture,
active_job: MagicMock | None,
expected_log: str,
):
"""Test install logs progress periodically during image pull."""
coresys.security.force = True
coresys.docker.images.pull.return_value = AsyncIterator([{}])
original_wait_for = asyncio.wait_for
async def mock_wait_for(coro, *, timeout=None):
"""Immediately timeout for the progress log wait, pass through others."""
if timeout == 15:
coro.close()
await asyncio.sleep(0)
raise TimeoutError
return await original_wait_for(coro, timeout=timeout)
with (
patch.object(HomeAssistantCore, "start"),
patch.object(DockerHomeAssistant, "cleanup"),
patch.object(
Updater,
"image_homeassistant",
new=PropertyMock(return_value="homeassistant"),
),
patch.object(
Updater, "version_homeassistant", new=PropertyMock(return_value="2022.7.3")
),
patch.object(
DockerInterface, "arch", new=PropertyMock(return_value=CpuArch.AMD64)
),
patch("supervisor.homeassistant.core.asyncio.wait_for", new=mock_wait_for),
patch.object(
DockerHomeAssistant,
"active_job",
new=PropertyMock(return_value=active_job),
),
):
await coresys.homeassistant.core.install()
assert expected_log in caplog.text
@pytest.mark.parametrize(
("container_exc", "image_exc", "delete_calls"),
[

View File

@@ -58,11 +58,12 @@ async def test_load(
assert ha_ws_client.async_send_command.call_args_list[0][0][0] == {"lorem": "ipsum"}
async def test_list_users_none(coresys: CoreSys, ha_ws_client: AsyncMock):
"""Test list users raises on unexpected None response from Core."""
async def test_get_users_none(coresys: CoreSys, ha_ws_client: AsyncMock):
"""Test get users returning none does not fail."""
ha_ws_client.async_send_command.return_value = None
with pytest.raises(TypeError):
await coresys.homeassistant.list_users()
assert (
await coresys.homeassistant.get_users.__wrapped__(coresys.homeassistant) == []
)
async def test_write_pulse_error(coresys: CoreSys, caplog: pytest.LogCaptureFixture):

View File

@@ -8,7 +8,7 @@ import pytest
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.exceptions import HomeAssistantWSConnectionError
from supervisor.exceptions import HomeAssistantWSError
from supervisor.homeassistant.const import WSEvent, WSType
@@ -81,7 +81,7 @@ async def test_send_command_core_not_reachable(
ha_ws_client.connected = False
with (
patch.object(coresys.homeassistant.api, "check_api_state", return_value=False),
pytest.raises(HomeAssistantWSConnectionError, match="not reachable"),
pytest.raises(HomeAssistantWSError, match="not reachable"),
):
await coresys.homeassistant.websocket.async_send_command({"type": "test"})
@@ -102,7 +102,7 @@ async def test_fire_and_forget_core_not_reachable(
async def test_send_command_during_shutdown(coresys: CoreSys, ha_ws_client: AsyncMock):
"""Test async_send_command raises during shutdown."""
await coresys.core.set_state(CoreState.SHUTDOWN)
with pytest.raises(HomeAssistantWSConnectionError, match="shutting down"):
with pytest.raises(HomeAssistantWSError, match="shutting down"):
await coresys.homeassistant.websocket.async_send_command({"type": "test"})
ha_ws_client.async_send_command.assert_not_called()

View File

@@ -1,11 +1,10 @@
"""Test ingress."""
from datetime import timedelta
import json
from pathlib import Path
from unittest.mock import ANY, patch
from supervisor.const import HomeAssistantUser, IngressSessionData
from supervisor.const import IngressSessionData, IngressSessionDataUser
from supervisor.coresys import CoreSys
from supervisor.ingress import Ingress
from supervisor.utils.dt import utc_from_timestamp
@@ -35,7 +34,7 @@ def test_session_handling(coresys: CoreSys):
def test_session_handling_with_session_data(coresys: CoreSys):
"""Create and test session."""
session = coresys.ingress.create_session(
IngressSessionData(HomeAssistantUser("some-id"))
IngressSessionData(IngressSessionDataUser("some-id"))
)
assert session
@@ -77,7 +76,7 @@ async def test_ingress_save_data(coresys: CoreSys, tmp_supervisor_data: Path):
with patch("supervisor.ingress.FILE_HASSIO_INGRESS", new=config_file):
ingress = await Ingress(coresys).load_config()
session = ingress.create_session(
IngressSessionData(HomeAssistantUser("123", name="Test", username="test"))
IngressSessionData(IngressSessionDataUser("123", "Test", "test"))
)
await ingress.save_data()
@@ -88,47 +87,12 @@ async def test_ingress_save_data(coresys: CoreSys, tmp_supervisor_data: Path):
assert await coresys.run_in_executor(get_config) == {
"session": {session: ANY},
"session_data": {
session: {"user": {"id": "123", "name": "Test", "username": "test"}}
session: {"user": {"id": "123", "displayname": "Test", "username": "test"}}
},
"ports": {},
}
async def test_ingress_load_legacy_displayname(
coresys: CoreSys, tmp_supervisor_data: Path
):
"""Test loading session data with legacy 'displayname' key."""
config_file = tmp_supervisor_data / "ingress.json"
session_token = "a" * 128
config_file.write_text(
json.dumps(
{
"session": {session_token: 9999999999.0},
"session_data": {
session_token: {
"user": {
"id": "456",
"displayname": "Legacy Name",
"username": "legacy",
}
}
},
"ports": {},
}
)
)
with patch("supervisor.ingress.FILE_HASSIO_INGRESS", new=config_file):
ingress = await Ingress(coresys).load_config()
session_data = ingress.get_session_data(session_token)
assert session_data is not None
assert session_data.user.id == "456"
assert session_data.user.name == "Legacy Name"
assert session_data.user.username == "legacy"
async def test_ingress_reload_ignore_none_data(coresys: CoreSys):
"""Test reloading ingress does not add None for session data and create errors."""
session = coresys.ingress.create_session()