mirror of
https://github.com/home-assistant/supervisor.git
synced 2026-02-21 01:37:32 +00:00
Compare commits
4 Commits
backup-use
...
2026.02.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
50e6c88237 | ||
|
|
0cce2dad3c | ||
|
|
8dd42cb7a0 | ||
|
|
590674ba7c |
6
.github/workflows/builder.yml
vendored
6
.github/workflows/builder.yml
vendored
@@ -296,7 +296,11 @@ jobs:
|
||||
- &wait_for_supervisor
|
||||
name: Wait for Supervisor to come up
|
||||
run: |
|
||||
SUPERVISOR=$(docker inspect --format='{{.NetworkSettings.IPAddress}}' hassio_supervisor)
|
||||
until SUPERVISOR=$(docker inspect --format='{{.NetworkSettings.Networks.hassio.IPAddress}}' hassio_supervisor 2>/dev/null) && \
|
||||
[ -n "$SUPERVISOR" ] && [ "$SUPERVISOR" != "<no value>" ]; do
|
||||
echo "Waiting for network configuration..."
|
||||
sleep 1
|
||||
done
|
||||
echo "Waiting for Supervisor API at http://${SUPERVISOR}/supervisor/ping"
|
||||
timeout=300
|
||||
elapsed=0
|
||||
|
||||
@@ -8,7 +8,7 @@ pytest-asyncio==1.3.0
|
||||
pytest-cov==7.0.0
|
||||
pytest-timeout==2.4.0
|
||||
pytest==9.0.2
|
||||
ruff==0.15.0
|
||||
ruff==0.15.1
|
||||
time-machine==3.2.0
|
||||
types-docker==7.1.0.20260109
|
||||
types-pyyaml==6.0.12.20250915
|
||||
|
||||
@@ -20,7 +20,7 @@ from typing import Any, Final, cast
|
||||
import aiohttp
|
||||
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
|
||||
from deepmerge import Merger
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path
|
||||
import voluptuous as vol
|
||||
from voluptuous.humanize import humanize_error
|
||||
|
||||
@@ -76,7 +76,6 @@ from ..exceptions import (
|
||||
AddonsError,
|
||||
AddonsJobError,
|
||||
AddonUnknownError,
|
||||
BackupInvalidError,
|
||||
BackupRestoreUnknownError,
|
||||
ConfigurationFileError,
|
||||
DockerBuildError,
|
||||
@@ -1445,11 +1444,10 @@ class Addon(AddonModel):
|
||||
tmp = TemporaryDirectory(dir=self.sys_config.path_tmp)
|
||||
try:
|
||||
with tar_file as backup:
|
||||
# The tar filter rejects path traversal and absolute names,
|
||||
# aborting restore of malicious backups with such exploits.
|
||||
backup.extractall(
|
||||
path=tmp.name,
|
||||
filter="tar",
|
||||
members=secure_path(backup),
|
||||
filter="fully_trusted",
|
||||
)
|
||||
|
||||
data = read_json_file(Path(tmp.name, "addon.json"))
|
||||
@@ -1461,12 +1459,8 @@ class Addon(AddonModel):
|
||||
|
||||
try:
|
||||
tmp, data = await self.sys_run_in_executor(_extract_tarfile)
|
||||
except tarfile.FilterError as err:
|
||||
raise BackupInvalidError(
|
||||
f"Can't extract backup tarfile for {self.slug}: {err}",
|
||||
_LOGGER.error,
|
||||
) from err
|
||||
except tarfile.TarError as err:
|
||||
_LOGGER.error("Can't extract backup tarfile for %s: %s", self.slug, err)
|
||||
raise BackupRestoreUnknownError() from err
|
||||
except ConfigurationFileError as err:
|
||||
raise AddonUnknownError(addon=self.slug) from err
|
||||
|
||||
@@ -18,7 +18,7 @@ import time
|
||||
from typing import Any, Self, cast
|
||||
|
||||
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path
|
||||
import voluptuous as vol
|
||||
from voluptuous.humanize import humanize_error
|
||||
|
||||
@@ -512,24 +512,12 @@ class Backup(JobGroup):
|
||||
)
|
||||
tmp = TemporaryDirectory(dir=str(backup_tarfile.parent))
|
||||
|
||||
try:
|
||||
with tarfile.open(backup_tarfile, "r:") as tar:
|
||||
# The tar filter rejects path traversal and absolute names,
|
||||
# aborting restore of potentially crafted backups.
|
||||
tar.extractall(
|
||||
path=tmp.name,
|
||||
filter="tar",
|
||||
)
|
||||
except tarfile.FilterError as err:
|
||||
raise BackupInvalidError(
|
||||
f"Can't read backup tarfile {backup_tarfile.as_posix()}: {err}",
|
||||
_LOGGER.error,
|
||||
) from err
|
||||
except tarfile.TarError as err:
|
||||
raise BackupError(
|
||||
f"Can't read backup tarfile {backup_tarfile.as_posix()}: {err}",
|
||||
_LOGGER.error,
|
||||
) from err
|
||||
with tarfile.open(backup_tarfile, "r:") as tar:
|
||||
tar.extractall(
|
||||
path=tmp.name,
|
||||
members=secure_path(tar),
|
||||
filter="fully_trusted",
|
||||
)
|
||||
|
||||
return tmp
|
||||
|
||||
@@ -810,17 +798,10 @@ class Backup(JobGroup):
|
||||
bufsize=BUF_SIZE,
|
||||
password=self._password,
|
||||
) as tar_file:
|
||||
# The tar filter rejects path traversal and absolute names,
|
||||
# aborting restore of potentially crafted backups.
|
||||
tar_file.extractall(
|
||||
path=origin_dir,
|
||||
filter="tar",
|
||||
path=origin_dir, members=tar_file, filter="fully_trusted"
|
||||
)
|
||||
_LOGGER.info("Restore folder %s done", name)
|
||||
except tarfile.FilterError as err:
|
||||
raise BackupInvalidError(
|
||||
f"Can't restore folder {name}: {err}", _LOGGER.warning
|
||||
) from err
|
||||
except (tarfile.TarError, OSError) as err:
|
||||
raise BackupError(
|
||||
f"Can't restore folder {name}: {err}", _LOGGER.warning
|
||||
|
||||
@@ -9,7 +9,7 @@ from dataclasses import dataclass
|
||||
import errno
|
||||
from functools import partial
|
||||
from http import HTTPStatus
|
||||
from io import BufferedWriter
|
||||
from io import BufferedReader, BufferedWriter
|
||||
from ipaddress import IPv4Address
|
||||
import json
|
||||
import logging
|
||||
@@ -1025,13 +1025,30 @@ class DockerAPI(CoreSysAttributes):
|
||||
|
||||
async def import_image(self, tar_file: Path) -> dict[str, Any] | None:
|
||||
"""Import a tar file as image."""
|
||||
image_tar_stream: BufferedReader | None = None
|
||||
try:
|
||||
with tar_file.open("rb") as read_tar:
|
||||
resp: list[dict[str, Any]] = await self.images.import_image(read_tar)
|
||||
except (aiodocker.DockerError, OSError) as err:
|
||||
# Lambda avoids need for a cast here. Since return type of open is based on mode
|
||||
image_tar_stream = await self.sys_run_in_executor(
|
||||
lambda: tar_file.open("rb")
|
||||
)
|
||||
resp: list[dict[str, Any]] = await self.images.import_image(
|
||||
image_tar_stream
|
||||
)
|
||||
except aiodocker.DockerError as err:
|
||||
raise DockerError(
|
||||
f"Can't import image from tar: {err}", _LOGGER.error
|
||||
) from err
|
||||
except OSError as err:
|
||||
if err.errno == errno.EBADMSG:
|
||||
self.sys_resolution.add_unhealthy_reason(
|
||||
UnhealthyReason.OSERROR_BAD_MESSAGE
|
||||
)
|
||||
raise DockerError(
|
||||
f"Can't read tar file {tar_file}: {err}", _LOGGER.error
|
||||
) from err
|
||||
finally:
|
||||
if image_tar_stream:
|
||||
await self.sys_run_in_executor(image_tar_stream.close)
|
||||
|
||||
docker_image_list: list[str] = []
|
||||
for chunk in resp:
|
||||
@@ -1066,12 +1083,13 @@ class DockerAPI(CoreSysAttributes):
|
||||
image_tar_stream: BufferedWriter | None = None
|
||||
|
||||
try:
|
||||
image_tar_stream = image_writer = cast(
|
||||
BufferedWriter, await self.sys_run_in_executor(tar_file.open, "wb")
|
||||
# Lambda avoids need for a cast here. Since return type of open is based on mode
|
||||
image_tar_stream = await self.sys_run_in_executor(
|
||||
lambda: tar_file.open("wb")
|
||||
)
|
||||
async with self.images.export_image(f"{image}:{version}") as content:
|
||||
async for chunk in content.iter_chunked(DEFAULT_CHUNK_SIZE):
|
||||
await self.sys_run_in_executor(image_writer.write, chunk)
|
||||
await self.sys_run_in_executor(image_tar_stream.write, chunk)
|
||||
except aiodocker.DockerError as err:
|
||||
raise DockerError(
|
||||
f"Can't fetch image {image}:{version}: {err}", _LOGGER.error
|
||||
|
||||
@@ -182,28 +182,53 @@ class HomeAssistantCore(JobGroup):
|
||||
concurrency=JobConcurrency.GROUP_REJECT,
|
||||
)
|
||||
async def install(self) -> None:
|
||||
"""Install a landing page."""
|
||||
"""Install Home Assistant Core."""
|
||||
_LOGGER.info("Home Assistant setup")
|
||||
while True:
|
||||
# read homeassistant tag and install it
|
||||
if not self.sys_homeassistant.latest_version:
|
||||
await self.sys_updater.reload()
|
||||
stop_progress_log = asyncio.Event()
|
||||
|
||||
if to_version := self.sys_homeassistant.latest_version:
|
||||
async def _periodic_progress_log() -> None:
|
||||
"""Log installation progress periodically for user visibility."""
|
||||
while not stop_progress_log.is_set():
|
||||
try:
|
||||
await self.instance.update(
|
||||
to_version,
|
||||
image=self.sys_updater.image_homeassistant,
|
||||
)
|
||||
self.sys_homeassistant.version = self.instance.version or to_version
|
||||
break
|
||||
except (DockerError, JobException):
|
||||
pass
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
await async_capture_exception(err)
|
||||
await asyncio.wait_for(stop_progress_log.wait(), timeout=15)
|
||||
except TimeoutError:
|
||||
if (job := self.instance.active_job) and job.progress:
|
||||
_LOGGER.info(
|
||||
"Downloading Home Assistant Core image, %d%%",
|
||||
int(job.progress),
|
||||
)
|
||||
else:
|
||||
_LOGGER.info("Home Assistant Core installation in progress")
|
||||
|
||||
_LOGGER.warning("Error on Home Assistant installation. Retrying in 30sec")
|
||||
await asyncio.sleep(30)
|
||||
progress_task = self.sys_create_task(_periodic_progress_log())
|
||||
try:
|
||||
while True:
|
||||
# read homeassistant tag and install it
|
||||
if not self.sys_homeassistant.latest_version:
|
||||
await self.sys_updater.reload()
|
||||
|
||||
if to_version := self.sys_homeassistant.latest_version:
|
||||
try:
|
||||
await self.instance.update(
|
||||
to_version,
|
||||
image=self.sys_updater.image_homeassistant,
|
||||
)
|
||||
self.sys_homeassistant.version = (
|
||||
self.instance.version or to_version
|
||||
)
|
||||
break
|
||||
except (DockerError, JobException):
|
||||
pass
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
await async_capture_exception(err)
|
||||
|
||||
_LOGGER.warning(
|
||||
"Error on Home Assistant installation. Retrying in 30sec"
|
||||
)
|
||||
await asyncio.sleep(30)
|
||||
finally:
|
||||
stop_progress_log.set()
|
||||
await progress_task
|
||||
|
||||
_LOGGER.info("Home Assistant docker now installed")
|
||||
self.sys_homeassistant.set_image(self.sys_updater.image_homeassistant)
|
||||
|
||||
@@ -13,7 +13,7 @@ from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
from awesomeversion import AwesomeVersion, AwesomeVersionException
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path
|
||||
import voluptuous as vol
|
||||
from voluptuous.humanize import humanize_error
|
||||
|
||||
@@ -40,7 +40,6 @@ from ..const import (
|
||||
)
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..exceptions import (
|
||||
BackupInvalidError,
|
||||
ConfigurationFileError,
|
||||
HomeAssistantBackupError,
|
||||
HomeAssistantError,
|
||||
@@ -496,16 +495,11 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
|
||||
# extract backup
|
||||
try:
|
||||
with tar_file as backup:
|
||||
# The tar filter rejects path traversal and absolute names,
|
||||
# aborting restore of potentially crafted backups.
|
||||
backup.extractall(
|
||||
path=temp_path,
|
||||
filter="tar",
|
||||
members=secure_path(backup),
|
||||
filter="fully_trusted",
|
||||
)
|
||||
except tarfile.FilterError as err:
|
||||
raise BackupInvalidError(
|
||||
f"Invalid tarfile {tar_file}: {err}", _LOGGER.error
|
||||
) from err
|
||||
except tarfile.TarError as err:
|
||||
raise HomeAssistantError(
|
||||
f"Can't read tarfile {tar_file}: {err}", _LOGGER.error
|
||||
|
||||
@@ -1,170 +0,0 @@
|
||||
"""Security tests for backup tar extraction with tar filter."""
|
||||
|
||||
import io
|
||||
from pathlib import Path
|
||||
import tarfile
|
||||
|
||||
import pytest
|
||||
from securetar import SecureTarFile
|
||||
|
||||
from supervisor.backups.backup import Backup
|
||||
from supervisor.coresys import CoreSys
|
||||
from supervisor.exceptions import BackupInvalidError
|
||||
|
||||
|
||||
def _create_tar_gz(
|
||||
path: Path,
|
||||
members: list[tarfile.TarInfo],
|
||||
file_data: dict[str, bytes] | None = None,
|
||||
) -> None:
|
||||
"""Create a tar.gz file with specified members."""
|
||||
if file_data is None:
|
||||
file_data = {}
|
||||
with tarfile.open(path, "w:gz") as tar:
|
||||
for info in members:
|
||||
data = file_data.get(info.name)
|
||||
if data is not None:
|
||||
tar.addfile(info, io.BytesIO(data))
|
||||
else:
|
||||
tar.addfile(info)
|
||||
|
||||
|
||||
def test_path_traversal_rejected(tmp_path: Path):
|
||||
"""Test that path traversal in member names is rejected."""
|
||||
traversal_info = tarfile.TarInfo(name="../../etc/passwd")
|
||||
traversal_info.size = 9
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with (
|
||||
tarfile.open(tar_path, "r:gz") as tar,
|
||||
pytest.raises(tarfile.OutsideDestinationError),
|
||||
):
|
||||
tar.extractall(path=dest, filter="tar")
|
||||
|
||||
|
||||
def test_symlink_write_through_rejected(tmp_path: Path):
|
||||
"""Test that writing through a symlink to outside destination is rejected.
|
||||
|
||||
The tar filter's realpath check follows already-extracted symlinks on disk,
|
||||
catching write-through attacks even without explicit link target validation.
|
||||
"""
|
||||
# Symlink pointing outside, then a file entry writing through it
|
||||
link_info = tarfile.TarInfo(name="escape")
|
||||
link_info.type = tarfile.SYMTYPE
|
||||
link_info.linkname = "../outside"
|
||||
file_info = tarfile.TarInfo(name="escape/evil.py")
|
||||
file_info.size = 9
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(
|
||||
tar_path,
|
||||
[link_info, file_info],
|
||||
{"escape/evil.py": b"malicious"},
|
||||
)
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with (
|
||||
tarfile.open(tar_path, "r:gz") as tar,
|
||||
pytest.raises(tarfile.OutsideDestinationError),
|
||||
):
|
||||
tar.extractall(path=dest, filter="tar")
|
||||
|
||||
# The evil file must not exist outside the destination
|
||||
assert not (tmp_path / "outside" / "evil.py").exists()
|
||||
|
||||
|
||||
def test_absolute_name_stripped_and_extracted(tmp_path: Path):
|
||||
"""Test that absolute member names have leading / stripped and extract safely."""
|
||||
info = tarfile.TarInfo(name="/etc/test.conf")
|
||||
info.size = 5
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(tar_path, [info], {"/etc/test.conf": b"hello"})
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with tarfile.open(tar_path, "r:gz") as tar:
|
||||
tar.extractall(path=dest, filter="tar")
|
||||
|
||||
# Extracted inside destination with leading / stripped
|
||||
assert (dest / "etc" / "test.conf").read_text() == "hello"
|
||||
|
||||
|
||||
def test_valid_backup_with_internal_symlinks(tmp_path: Path):
|
||||
"""Test that valid backups with internal relative symlinks extract correctly."""
|
||||
dir_info = tarfile.TarInfo(name="subdir")
|
||||
dir_info.type = tarfile.DIRTYPE
|
||||
dir_info.mode = 0o755
|
||||
|
||||
file_info = tarfile.TarInfo(name="subdir/config.yaml")
|
||||
file_info.size = 11
|
||||
|
||||
link_info = tarfile.TarInfo(name="config_link")
|
||||
link_info.type = tarfile.SYMTYPE
|
||||
link_info.linkname = "subdir/config.yaml"
|
||||
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(
|
||||
tar_path,
|
||||
[dir_info, file_info, link_info],
|
||||
{"subdir/config.yaml": b"key: value\n"},
|
||||
)
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with tarfile.open(tar_path, "r:gz") as tar:
|
||||
tar.extractall(path=dest, filter="tar")
|
||||
|
||||
assert (dest / "subdir" / "config.yaml").read_text() == "key: value\n"
|
||||
assert (dest / "config_link").is_symlink()
|
||||
assert (dest / "config_link").read_text() == "key: value\n"
|
||||
|
||||
|
||||
def test_uid_gid_preserved(tmp_path: Path):
|
||||
"""Test that tar filter preserves file ownership."""
|
||||
info = tarfile.TarInfo(name="owned_file.txt")
|
||||
info.size = 5
|
||||
info.uid = 1000
|
||||
info.gid = 1000
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(tar_path, [info], {"owned_file.txt": b"hello"})
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with tarfile.open(tar_path, "r:gz") as tar:
|
||||
# Extract member via filter only (don't actually extract, just check
|
||||
# the filter preserves uid/gid)
|
||||
for member in tar:
|
||||
filtered = tarfile.tar_filter(member, str(dest))
|
||||
assert filtered.uid == 1000
|
||||
assert filtered.gid == 1000
|
||||
|
||||
|
||||
async def test_backup_open_rejects_path_traversal(coresys: CoreSys, tmp_path: Path):
|
||||
"""Test that Backup.open() raises BackupInvalidError for path traversal."""
|
||||
tar_path = tmp_path / "malicious.tar"
|
||||
traversal_info = tarfile.TarInfo(name="../../etc/passwd")
|
||||
traversal_info.size = 9
|
||||
with tarfile.open(tar_path, "w:") as tar:
|
||||
tar.addfile(traversal_info, io.BytesIO(b"malicious"))
|
||||
|
||||
backup = Backup(coresys, tar_path, "test", None)
|
||||
with pytest.raises(BackupInvalidError):
|
||||
async with backup.open(None):
|
||||
pass
|
||||
|
||||
|
||||
async def test_homeassistant_restore_rejects_path_traversal(
|
||||
coresys: CoreSys, tmp_supervisor_data: Path
|
||||
):
|
||||
"""Test that Home Assistant restore raises BackupInvalidError for path traversal."""
|
||||
tar_path = tmp_supervisor_data / "homeassistant.tar.gz"
|
||||
traversal_info = tarfile.TarInfo(name="../../etc/passwd")
|
||||
traversal_info.size = 9
|
||||
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
|
||||
|
||||
tar_file = SecureTarFile(tar_path, "r", gzip=True)
|
||||
with pytest.raises(BackupInvalidError):
|
||||
await coresys.homeassistant.restore(tar_file)
|
||||
@@ -18,7 +18,7 @@ from aiodocker.system import DockerSystem
|
||||
from aiohttp import ClientSession, web
|
||||
from aiohttp.test_utils import TestClient
|
||||
from awesomeversion import AwesomeVersion
|
||||
from blockbuster import BlockBuster, blockbuster_ctx
|
||||
from blockbuster import BlockBuster, BlockBusterFunction
|
||||
from dbus_fast import BusType
|
||||
from dbus_fast.aio.message_bus import MessageBus
|
||||
import pytest
|
||||
@@ -94,9 +94,17 @@ def blockbuster(request: pytest.FixtureRequest) -> BlockBuster | None:
|
||||
# But it will ignore calls to libraries and such that do blocking I/O directly from tests
|
||||
# Removing that would be nice but a todo for the future
|
||||
|
||||
# pylint: disable-next=contextmanager-generator-missing-cleanup
|
||||
with blockbuster_ctx(scanned_modules=["supervisor"]) as bb:
|
||||
yield bb
|
||||
SCANNED_MODULES = ["supervisor"]
|
||||
blockbuster = BlockBuster(scanned_modules=SCANNED_MODULES)
|
||||
blockbuster.functions["pathlib.Path.open"] = BlockBusterFunction(
|
||||
Path, "open", scanned_modules=SCANNED_MODULES
|
||||
)
|
||||
blockbuster.functions["pathlib.Path.close"] = BlockBusterFunction(
|
||||
Path, "close", scanned_modules=SCANNED_MODULES
|
||||
)
|
||||
blockbuster.activate()
|
||||
yield blockbuster
|
||||
blockbuster.deactivate()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
||||
16
tests/fixtures/container_attrs.json
vendored
16
tests/fixtures/container_attrs.json
vendored
@@ -210,28 +210,14 @@
|
||||
}
|
||||
},
|
||||
"NetworkSettings": {
|
||||
"Bridge": "",
|
||||
"SandboxID": "067cd11a63f96d227dcc0f01d3e4f5053c368021becd0b4b2da4f301cfda3d29",
|
||||
"HairpinMode": false,
|
||||
"LinkLocalIPv6Address": "",
|
||||
"LinkLocalIPv6PrefixLen": 0,
|
||||
"SandboxKey": "/var/run/docker/netns/067cd11a63f9",
|
||||
"Ports": {
|
||||
"1883/tcp": [
|
||||
{ "HostIp": "0.0.0.0", "HostPort": "1883" },
|
||||
{ "HostIp": "::", "HostPort": "1883" }
|
||||
]
|
||||
},
|
||||
"SandboxKey": "/var/run/docker/netns/067cd11a63f9",
|
||||
"SecondaryIPAddresses": null,
|
||||
"SecondaryIPv6Addresses": null,
|
||||
"EndpointID": "",
|
||||
"Gateway": "",
|
||||
"GlobalIPv6Address": "",
|
||||
"GlobalIPv6PrefixLen": 0,
|
||||
"IPAddress": "",
|
||||
"IPPrefixLen": 0,
|
||||
"IPv6Gateway": "",
|
||||
"MacAddress": "",
|
||||
"Networks": {
|
||||
"hassio": {
|
||||
"IPAMConfig": null,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Test Home Assistant core."""
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime, timedelta
|
||||
from http import HTTPStatus
|
||||
from unittest.mock import ANY, MagicMock, Mock, PropertyMock, call, patch
|
||||
@@ -206,6 +207,58 @@ async def test_install_other_error(
|
||||
assert "Unhandled exception:" not in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("active_job", "expected_log"),
|
||||
[
|
||||
(None, "Home Assistant Core installation in progress"),
|
||||
(MagicMock(progress=45.0), "Downloading Home Assistant Core image, 45%"),
|
||||
],
|
||||
)
|
||||
async def test_install_logs_progress_periodically(
|
||||
coresys: CoreSys,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
active_job: MagicMock | None,
|
||||
expected_log: str,
|
||||
):
|
||||
"""Test install logs progress periodically during image pull."""
|
||||
coresys.security.force = True
|
||||
coresys.docker.images.pull.return_value = AsyncIterator([{}])
|
||||
original_wait_for = asyncio.wait_for
|
||||
|
||||
async def mock_wait_for(coro, *, timeout=None):
|
||||
"""Immediately timeout for the progress log wait, pass through others."""
|
||||
if timeout == 15:
|
||||
coro.close()
|
||||
await asyncio.sleep(0)
|
||||
raise TimeoutError
|
||||
return await original_wait_for(coro, timeout=timeout)
|
||||
|
||||
with (
|
||||
patch.object(HomeAssistantCore, "start"),
|
||||
patch.object(DockerHomeAssistant, "cleanup"),
|
||||
patch.object(
|
||||
Updater,
|
||||
"image_homeassistant",
|
||||
new=PropertyMock(return_value="homeassistant"),
|
||||
),
|
||||
patch.object(
|
||||
Updater, "version_homeassistant", new=PropertyMock(return_value="2022.7.3")
|
||||
),
|
||||
patch.object(
|
||||
DockerInterface, "arch", new=PropertyMock(return_value=CpuArch.AMD64)
|
||||
),
|
||||
patch("supervisor.homeassistant.core.asyncio.wait_for", new=mock_wait_for),
|
||||
patch.object(
|
||||
DockerHomeAssistant,
|
||||
"active_job",
|
||||
new=PropertyMock(return_value=active_job),
|
||||
),
|
||||
):
|
||||
await coresys.homeassistant.core.install()
|
||||
|
||||
assert expected_log in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("container_exc", "image_exc", "delete_calls"),
|
||||
[
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Tests for apparmor utility."""
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
@@ -31,13 +32,20 @@ profile test flags=(attach_disconnected,mediate_deleted) {
|
||||
|
||||
async def test_valid_apparmor_file():
|
||||
"""Test a valid apparmor file."""
|
||||
assert validate_profile("example", get_fixture_path("apparmor_valid.txt"))
|
||||
assert await asyncio.get_running_loop().run_in_executor(
|
||||
None, validate_profile, "example", get_fixture_path("apparmor_valid.txt")
|
||||
)
|
||||
|
||||
|
||||
async def test_apparmor_missing_profile(caplog: pytest.LogCaptureFixture):
|
||||
"""Test apparmor file missing profile."""
|
||||
with pytest.raises(AppArmorInvalidError):
|
||||
validate_profile("example", get_fixture_path("apparmor_no_profile.txt"))
|
||||
await asyncio.get_running_loop().run_in_executor(
|
||||
None,
|
||||
validate_profile,
|
||||
"example",
|
||||
get_fixture_path("apparmor_no_profile.txt"),
|
||||
)
|
||||
|
||||
assert (
|
||||
"Missing AppArmor profile inside file: apparmor_no_profile.txt" in caplog.text
|
||||
@@ -47,7 +55,12 @@ async def test_apparmor_missing_profile(caplog: pytest.LogCaptureFixture):
|
||||
async def test_apparmor_multiple_profiles(caplog: pytest.LogCaptureFixture):
|
||||
"""Test apparmor file with too many profiles."""
|
||||
with pytest.raises(AppArmorInvalidError):
|
||||
validate_profile("example", get_fixture_path("apparmor_multiple_profiles.txt"))
|
||||
await asyncio.get_running_loop().run_in_executor(
|
||||
None,
|
||||
validate_profile,
|
||||
"example",
|
||||
get_fixture_path("apparmor_multiple_profiles.txt"),
|
||||
)
|
||||
|
||||
assert (
|
||||
"Too many AppArmor profiles inside file: apparmor_multiple_profiles.txt"
|
||||
|
||||
Reference in New Issue
Block a user