Compare commits

..

1 Commits

Author SHA1 Message Date
Mike Degatano
0786e06eb9 Re-add typeerror handling to format message 2025-03-01 15:52:29 +00:00
85 changed files with 396 additions and 505 deletions

View File

@@ -106,7 +106,7 @@ jobs:
- name: Build wheels
if: needs.init.outputs.requirements == 'true'
uses: home-assistant/wheels@2025.02.0
uses: home-assistant/wheels@2024.11.0
with:
abi: cp313
tag: musllinux_1_2

View File

@@ -12,7 +12,7 @@ jobs:
- name: Check out code from GitHub
uses: actions/checkout@v4.2.2
- name: Sentry Release
uses: getsentry/action-release@v3.0.0
uses: getsentry/action-release@v1.10.4
env:
SENTRY_AUTH_TOKEN: ${{ secrets.SENTRY_AUTH_TOKEN }}
SENTRY_ORG: ${{ secrets.SENTRY_ORG }}

View File

@@ -7,7 +7,7 @@ brotli==1.1.0
ciso8601==2.3.2
colorlog==6.9.0
cpe==1.3.1
cryptography==44.0.2
cryptography==44.0.1
debugpy==1.8.12
deepmerge==2.0
dirhash==0.5.0

View File

@@ -6,8 +6,8 @@ pytest-aiohttp==1.1.0
pytest-asyncio==0.25.2
pytest-cov==6.0.0
pytest-timeout==2.3.1
pytest==8.3.5
ruff==0.9.9
pytest==8.3.4
ruff==0.9.8
time-machine==2.16.0
typing_extensions==4.12.2
urllib3==2.3.0

View File

@@ -140,7 +140,9 @@ class Addon(AddonModel):
super().__init__(coresys, slug)
self.instance: DockerAddon = DockerAddon(coresys, self)
self._state: AddonState = AddonState.UNKNOWN
self._manual_stop: bool = False
self._manual_stop: bool = (
self.sys_hardware.helper.last_boot != self.sys_config.last_boot
)
self._listeners: list[EventListener] = []
self._startup_event = asyncio.Event()
self._startup_task: asyncio.Task | None = None
@@ -214,10 +216,6 @@ class Addon(AddonModel):
async def load(self) -> None:
"""Async initialize of object."""
self._manual_stop = (
await self.sys_hardware.helper.last_boot() != self.sys_config.last_boot
)
if self.is_detached:
await super().refresh_path_cache()
@@ -722,7 +720,7 @@ class Addon(AddonModel):
try:
options = self.schema.validate(self.options)
await self.sys_run_in_executor(write_json_file, self.path_options, options)
write_json_file(self.path_options, options)
except vol.Invalid as ex:
_LOGGER.error(
"Add-on %s has invalid options: %s",
@@ -940,20 +938,19 @@ class Addon(AddonModel):
)
return out
async def write_pulse(self) -> None:
def write_pulse(self) -> None:
"""Write asound config to file and return True on success."""
pulse_config = self.sys_plugins.audio.pulse_client(
input_profile=self.audio_input, output_profile=self.audio_output
)
def write_pulse_config():
# Cleanup wrong maps
if self.path_pulse.is_dir():
shutil.rmtree(self.path_pulse, ignore_errors=True)
self.path_pulse.write_text(pulse_config, encoding="utf-8")
# Cleanup wrong maps
if self.path_pulse.is_dir():
shutil.rmtree(self.path_pulse, ignore_errors=True)
# Write pulse config
try:
await self.sys_run_in_executor(write_pulse_config)
self.path_pulse.write_text(pulse_config, encoding="utf-8")
except OSError as err:
if err.errno == errno.EBADMSG:
self.sys_resolution.unhealthy = UnhealthyReason.OSERROR_BAD_MESSAGE
@@ -1073,7 +1070,7 @@ class Addon(AddonModel):
# Sound
if self.with_audio:
await self.write_pulse()
self.write_pulse()
def _check_addon_config_dir():
if self.path_config.is_dir():

View File

@@ -531,8 +531,6 @@ class APIBackups(CoreSysAttributes):
def close_backup_file() -> None:
if backup_file_stream:
# Make sure it got closed, in case of exception. It is safe to
# close the file stream twice.
backup_file_stream.close()
if temp_dir:
temp_dir.cleanup()
@@ -543,7 +541,6 @@ class APIBackups(CoreSysAttributes):
tar_file = await self.sys_run_in_executor(open_backup_file)
while chunk := await contents.read_chunk(size=2**16):
await self.sys_run_in_executor(backup_file_stream.write, chunk)
await self.sys_run_in_executor(backup_file_stream.close)
backup = await asyncio.shield(
self.sys_backups.import_backup(
@@ -566,7 +563,8 @@ class APIBackups(CoreSysAttributes):
return False
finally:
await self.sys_run_in_executor(close_backup_file)
if temp_dir or backup:
await self.sys_run_in_executor(close_backup_file)
if backup:
return {ATTR_SLUG: backup.slug}

View File

@@ -255,22 +255,16 @@ class APIHost(CoreSysAttributes):
response.content_type = CONTENT_TYPE_TEXT
headers_returned = False
async for cursor, line in journal_logs_reader(resp, log_formatter):
try:
if not headers_returned:
if cursor:
response.headers["X-First-Cursor"] = cursor
response.headers["X-Accel-Buffering"] = "no"
await response.prepare(request)
headers_returned = True
if not headers_returned:
if cursor:
response.headers["X-First-Cursor"] = cursor
response.headers["X-Accel-Buffering"] = "no"
await response.prepare(request)
headers_returned = True
# When client closes the connection while reading busy logs, we
# sometimes get this exception. It should be safe to ignore it.
with suppress(ClientConnectionResetError):
await response.write(line.encode("utf-8") + b"\n")
except ClientConnectionResetError as err:
# When client closes the connection while reading busy logs, we
# sometimes get this exception. It should be safe to ignore it.
_LOGGER.debug(
"ClientConnectionResetError raised when returning journal logs: %s",
err,
)
break
except ConnectionResetError as ex:
raise APIError(
"Connection reset when trying to fetch data from systemd-journald."

View File

@@ -69,21 +69,12 @@ SCHEMA_ADD_REPOSITORY = vol.Schema(
)
def _read_static_text_file(path: Path) -> Any:
"""Read in a static text file asset for API output.
def _read_static_file(path: Path, binary: bool = False) -> Any:
"""Read in a static file asset for API output.
Must be run in executor.
"""
with path.open("r", errors="replace") as asset:
return asset.read()
def _read_static_binary_file(path: Path) -> Any:
"""Read in a static binary file asset for API output.
Must be run in executor.
"""
with path.open("rb") as asset:
with path.open("rb" if binary else "r") as asset:
return asset.read()
@@ -256,7 +247,7 @@ class APIStore(CoreSysAttributes):
if not addon.with_icon:
raise APIError(f"No icon found for add-on {addon.slug}!")
return await self.sys_run_in_executor(_read_static_binary_file, addon.path_icon)
return await self.sys_run_in_executor(_read_static_file, addon.path_icon, True)
@api_process_raw(CONTENT_TYPE_PNG)
async def addons_addon_logo(self, request: web.Request) -> bytes:
@@ -265,7 +256,7 @@ class APIStore(CoreSysAttributes):
if not addon.with_logo:
raise APIError(f"No logo found for add-on {addon.slug}!")
return await self.sys_run_in_executor(_read_static_binary_file, addon.path_logo)
return await self.sys_run_in_executor(_read_static_file, addon.path_logo, True)
@api_process_raw(CONTENT_TYPE_TEXT)
async def addons_addon_changelog(self, request: web.Request) -> str:
@@ -279,9 +270,7 @@ class APIStore(CoreSysAttributes):
if not addon.with_changelog:
return f"No changelog found for add-on {addon.slug}!"
return await self.sys_run_in_executor(
_read_static_text_file, addon.path_changelog
)
return await self.sys_run_in_executor(_read_static_file, addon.path_changelog)
@api_process_raw(CONTENT_TYPE_TEXT)
async def addons_addon_documentation(self, request: web.Request) -> str:
@@ -296,7 +285,7 @@ class APIStore(CoreSysAttributes):
return f"No documentation found for add-on {addon.slug}!"
return await self.sys_run_in_executor(
_read_static_text_file, addon.path_documentation
_read_static_file, addon.path_documentation
)
@api_process

View File

@@ -25,7 +25,7 @@ from ..coresys import CoreSys
from ..exceptions import APIError, BackupFileNotFoundError, DockerAPIError, HassioError
from ..utils import check_exception_chain, get_message_from_exception_chain
from ..utils.json import json_dumps, json_loads as json_loads_util
from ..utils.log_format import format_message
from ..utils.log_format import async_format_message
from . import const
@@ -139,7 +139,7 @@ def api_return_error(
if error and not message:
message = get_message_from_exception_chain(error)
if check_exception_chain(error, DockerAPIError):
message = format_message(message)
message = async_format_message(message)
if not message:
message = "Unknown error, see supervisor"

View File

@@ -50,7 +50,7 @@ class CpuArch(CoreSysAttributes):
async def load(self) -> None:
"""Load data and initialize default arch."""
try:
arch_data = await self.sys_run_in_executor(read_json_file, ARCH_JSON)
arch_data = read_json_file(ARCH_JSON)
except ConfigurationFileError:
_LOGGER.warning("Can't read arch json file from %s", ARCH_JSON)
return

View File

@@ -496,7 +496,7 @@ class BackupManager(FileConfiguration, JobGroup):
addon_start_tasks: list[Awaitable[None]] | None = None
try:
await self.sys_core.set_state(CoreState.FREEZE)
self.sys_core.state = CoreState.FREEZE
async with backup.create():
# HomeAssistant Folder is for v1
@@ -549,7 +549,7 @@ class BackupManager(FileConfiguration, JobGroup):
return backup
finally:
await self.sys_core.set_state(CoreState.RUNNING)
self.sys_core.state = CoreState.RUNNING
@Job(
name="backup_manager_full_backup",
@@ -808,7 +808,7 @@ class BackupManager(FileConfiguration, JobGroup):
)
_LOGGER.info("Full-Restore %s start", backup.slug)
await self.sys_core.set_state(CoreState.FREEZE)
self.sys_core.state = CoreState.FREEZE
try:
# Stop Home-Assistant / Add-ons
@@ -823,7 +823,7 @@ class BackupManager(FileConfiguration, JobGroup):
location=location,
)
finally:
await self.sys_core.set_state(CoreState.RUNNING)
self.sys_core.state = CoreState.RUNNING
if success:
_LOGGER.info("Full-Restore %s done", backup.slug)
@@ -878,7 +878,7 @@ class BackupManager(FileConfiguration, JobGroup):
)
_LOGGER.info("Partial-Restore %s start", backup.slug)
await self.sys_core.set_state(CoreState.FREEZE)
self.sys_core.state = CoreState.FREEZE
try:
success = await self._do_restore(
@@ -890,7 +890,7 @@ class BackupManager(FileConfiguration, JobGroup):
location=location,
)
finally:
await self.sys_core.set_state(CoreState.RUNNING)
self.sys_core.state = CoreState.RUNNING
if success:
_LOGGER.info("Partial-Restore %s done", backup.slug)
@@ -904,7 +904,7 @@ class BackupManager(FileConfiguration, JobGroup):
)
async def freeze_all(self, timeout: float = DEFAULT_FREEZE_TIMEOUT) -> None:
"""Freeze system to prepare for an external backup such as an image snapshot."""
await self.sys_core.set_state(CoreState.FREEZE)
self.sys_core.state = CoreState.FREEZE
# Determine running addons
installed = self.sys_addons.installed.copy()
@@ -957,7 +957,7 @@ class BackupManager(FileConfiguration, JobGroup):
if task
]
finally:
await self.sys_core.set_state(CoreState.RUNNING)
self.sys_core.state = CoreState.RUNNING
self._thaw_event.clear()
self._thaw_task = None

View File

@@ -5,7 +5,6 @@ from collections.abc import Awaitable
from contextlib import suppress
from datetime import timedelta
import logging
from typing import Self
from .const import (
ATTR_STARTUP,
@@ -40,6 +39,7 @@ class Core(CoreSysAttributes):
"""Initialize Supervisor object."""
self.coresys: CoreSys = coresys
self._state: CoreState = CoreState.INITIALIZE
self._write_run_state(self._state)
self.exit_code: int = 0
@property
@@ -57,38 +57,32 @@ class Core(CoreSysAttributes):
"""Return true if the installation is healthy."""
return len(self.sys_resolution.unhealthy) == 0
async def _write_run_state(self):
def _write_run_state(self, new_state: CoreState):
"""Write run state for s6 service supervisor."""
try:
await self.sys_run_in_executor(
RUN_SUPERVISOR_STATE.write_text, str(self._state), encoding="utf-8"
)
RUN_SUPERVISOR_STATE.write_text(str(new_state), encoding="utf-8")
except OSError as err:
_LOGGER.warning(
"Can't update the Supervisor state to %s: %s", self._state, err
"Can't update the Supervisor state to %s: %s", new_state, err
)
async def post_init(self) -> Self:
"""Post init actions that must be done in event loop."""
await self._write_run_state()
return self
async def set_state(self, new_state: CoreState) -> None:
@state.setter
def state(self, new_state: CoreState) -> None:
"""Set core into new state."""
if self._state == new_state:
return
self._write_run_state(new_state)
self._state = new_state
await self._write_run_state()
# Don't attempt to notify anyone on CLOSE as we're about to stop the event loop
if self._state != CoreState.CLOSE:
self.sys_bus.fire_event(BusEvent.SUPERVISOR_STATE_CHANGE, self._state)
if new_state != CoreState.CLOSE:
self.sys_bus.fire_event(BusEvent.SUPERVISOR_STATE_CHANGE, new_state)
# These will be received by HA after startup has completed which won't make sense
if self._state not in STARTING_STATES:
if new_state not in STARTING_STATES:
self.sys_homeassistant.websocket.supervisor_update_event(
"info", {"state": self._state}
"info", {"state": new_state}
)
async def connect(self):
@@ -122,7 +116,7 @@ class Core(CoreSysAttributes):
async def setup(self):
"""Start setting up supervisor orchestration."""
await self.set_state(CoreState.SETUP)
self.state = CoreState.SETUP
# Check internet on startup
await self.sys_supervisor.check_connectivity()
@@ -202,7 +196,7 @@ class Core(CoreSysAttributes):
async def start(self):
"""Start Supervisor orchestration."""
await self.set_state(CoreState.STARTUP)
self.state = CoreState.STARTUP
# Check if system is healthy
if not self.supported:
@@ -229,7 +223,7 @@ class Core(CoreSysAttributes):
try:
# HomeAssistant is already running, only Supervisor restarted
if await self.sys_hardware.helper.last_boot() == self.sys_config.last_boot:
if self.sys_hardware.helper.last_boot == self.sys_config.last_boot:
_LOGGER.info("Detected Supervisor restart")
return
@@ -288,7 +282,7 @@ class Core(CoreSysAttributes):
self.sys_create_task(self.sys_updater.reload())
self.sys_create_task(self.sys_resolution.healthcheck())
await self.set_state(CoreState.RUNNING)
self.state = CoreState.RUNNING
self.sys_homeassistant.websocket.supervisor_update_event(
"supervisor", {ATTR_STARTUP: "complete"}
)
@@ -303,7 +297,7 @@ class Core(CoreSysAttributes):
return
# don't process scheduler anymore
await self.set_state(CoreState.STOPPING)
self.state = CoreState.STOPPING
# Stage 1
try:
@@ -338,7 +332,7 @@ class Core(CoreSysAttributes):
except TimeoutError:
_LOGGER.warning("Stage 2: Force Shutdown!")
await self.set_state(CoreState.CLOSE)
self.state = CoreState.CLOSE
_LOGGER.info("Supervisor is down - %d", self.exit_code)
self.sys_loop.stop()
@@ -346,7 +340,7 @@ class Core(CoreSysAttributes):
"""Shutdown all running containers in correct order."""
# don't process scheduler anymore
if self.state == CoreState.RUNNING:
await self.set_state(CoreState.SHUTDOWN)
self.state = CoreState.SHUTDOWN
# Shutdown Application Add-ons, using Home Assistant API
await self.sys_addons.shutdown(AddonStartup.APPLICATION)
@@ -368,7 +362,7 @@ class Core(CoreSysAttributes):
async def _update_last_boot(self):
"""Update last boot time."""
self.sys_config.last_boot = await self.sys_hardware.helper.last_boot()
self.sys_config.last_boot = self.sys_hardware.helper.last_boot
await self.sys_config.save_data()
async def _retrieve_whoami(self, with_ssl: bool) -> WhoamiData | None:

View File

@@ -25,7 +25,6 @@ class HwHelper(CoreSysAttributes):
def __init__(self, coresys: CoreSys):
"""Init hardware object."""
self.coresys = coresys
self._last_boot: datetime | None = None
@property
def support_audio(self) -> bool:
@@ -42,15 +41,11 @@ class HwHelper(CoreSysAttributes):
"""Return True if the device have USB ports."""
return bool(self.sys_hardware.filter_devices(subsystem=UdevSubsystem.USB))
async def last_boot(self) -> datetime | None:
@property
def last_boot(self) -> datetime | None:
"""Return last boot time."""
if self._last_boot:
return self._last_boot
try:
stats: str = await self.sys_run_in_executor(
_PROC_STAT.read_text, encoding="utf-8"
)
stats: str = _PROC_STAT.read_text(encoding="utf-8")
except OSError as err:
_LOGGER.error("Can't read stat data: %s", err)
return None
@@ -61,8 +56,7 @@ class HwHelper(CoreSysAttributes):
_LOGGER.error("Can't found last boot time!")
return None
self._last_boot = datetime.fromtimestamp(int(found.group(1)), UTC)
return self._last_boot
return datetime.fromtimestamp(int(found.group(1)), UTC)
def hide_virtual_device(self, udev_device: pyudev.Device) -> bool:
"""Small helper to hide not needed Devices."""

View File

@@ -342,7 +342,7 @@ class HomeAssistantCore(JobGroup):
await self.sys_homeassistant.save_data()
# Write audio settings
await self.sys_homeassistant.write_pulse()
self.sys_homeassistant.write_pulse()
try:
await self.instance.run(restore_job_id=self.sys_backups.current_restore)

View File

@@ -313,20 +313,19 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
BusEvent.HARDWARE_REMOVE_DEVICE, self._hardware_events
)
async def write_pulse(self):
def write_pulse(self):
"""Write asound config to file and return True on success."""
pulse_config = self.sys_plugins.audio.pulse_client(
input_profile=self.audio_input, output_profile=self.audio_output
)
def write_pulse_config():
# Cleanup wrong maps
if self.path_pulse.is_dir():
shutil.rmtree(self.path_pulse, ignore_errors=True)
self.path_pulse.write_text(pulse_config, encoding="utf-8")
# Cleanup wrong maps
if self.path_pulse.is_dir():
shutil.rmtree(self.path_pulse, ignore_errors=True)
# Write pulse config
try:
await self.sys_run_in_executor(write_pulse_config)
self.path_pulse.write_text(pulse_config, encoding="utf-8")
except OSError as err:
if err.errno == errno.EBADMSG:
self.sys_resolution.unhealthy = UnhealthyReason.OSERROR_BAD_MESSAGE

View File

@@ -72,9 +72,7 @@ class LogsControl(CoreSysAttributes):
async def load(self) -> None:
"""Load log control."""
try:
self._default_identifiers = await self.sys_run_in_executor(
read_json_file, SYSLOG_IDENTIFIERS_JSON
)
self._default_identifiers = read_json_file(SYSLOG_IDENTIFIERS_JSON)
except ConfigurationFileError:
_LOGGER.warning(
"Can't read syslog identifiers json file from %s",

View File

@@ -88,9 +88,7 @@ class PluginAudio(PluginBase):
# Initialize Client Template
try:
self.client_template = jinja2.Template(
await self.sys_run_in_executor(
PULSE_CLIENT_TMPL.read_text, encoding="utf-8"
)
PULSE_CLIENT_TMPL.read_text(encoding="utf-8")
)
except OSError as err:
if err.errno == errno.EBADMSG:
@@ -102,17 +100,13 @@ class PluginAudio(PluginBase):
# Setup default asound config
asound = self.sys_config.path_audio.joinpath("asound")
def setup_default_asound():
if not asound.exists():
if not asound.exists():
try:
shutil.copy(ASOUND_TMPL, asound)
try:
await self.sys_run_in_executor(setup_default_asound)
except OSError as err:
if err.errno == errno.EBADMSG:
self.sys_resolution.unhealthy = UnhealthyReason.OSERROR_BAD_MESSAGE
_LOGGER.error("Can't create default asound: %s", err)
except OSError as err:
if err.errno == errno.EBADMSG:
self.sys_resolution.unhealthy = UnhealthyReason.OSERROR_BAD_MESSAGE
_LOGGER.error("Can't create default asound: %s", err)
@Job(
name="plugin_audio_update",
@@ -129,7 +123,7 @@ class PluginAudio(PluginBase):
async def restart(self) -> None:
"""Restart Audio plugin."""
_LOGGER.info("Restarting Audio plugin")
await self._write_config()
self._write_config()
try:
await self.instance.restart()
except DockerError as err:
@@ -138,7 +132,7 @@ class PluginAudio(PluginBase):
async def start(self) -> None:
"""Run Audio plugin."""
_LOGGER.info("Starting Audio plugin")
await self._write_config()
self._write_config()
try:
await self.instance.run()
except DockerError as err:
@@ -183,11 +177,10 @@ class PluginAudio(PluginBase):
default_sink=output_profile,
)
async def _write_config(self):
def _write_config(self):
"""Write pulse audio config."""
try:
await self.sys_run_in_executor(
write_json_file,
write_json_file(
self.pulse_audio_config,
{
"debug": self.sys_config.logging == LogLevel.DEBUG,

View File

@@ -152,16 +152,15 @@ class PluginDns(PluginBase):
# Initialize CoreDNS Template
try:
self.resolv_template = jinja2.Template(
await self.sys_run_in_executor(RESOLV_TMPL.read_text, encoding="utf-8")
RESOLV_TMPL.read_text(encoding="utf-8")
)
except OSError as err:
if err.errno == errno.EBADMSG:
self.sys_resolution.unhealthy = UnhealthyReason.OSERROR_BAD_MESSAGE
_LOGGER.error("Can't read resolve.tmpl: %s", err)
try:
self.hosts_template = jinja2.Template(
await self.sys_run_in_executor(HOSTS_TMPL.read_text, encoding="utf-8")
HOSTS_TMPL.read_text(encoding="utf-8")
)
except OSError as err:
if err.errno == errno.EBADMSG:
@@ -172,7 +171,7 @@ class PluginDns(PluginBase):
await super().load()
# Update supervisor
await self._write_resolv(HOST_RESOLV)
self._write_resolv(HOST_RESOLV)
await self.sys_supervisor.check_connectivity()
async def install(self) -> None:
@@ -196,7 +195,7 @@ class PluginDns(PluginBase):
async def restart(self) -> None:
"""Restart CoreDNS plugin."""
await self._write_config()
self._write_config()
_LOGGER.info("Restarting CoreDNS plugin")
try:
await self.instance.restart()
@@ -205,7 +204,7 @@ class PluginDns(PluginBase):
async def start(self) -> None:
"""Run CoreDNS."""
await self._write_config()
self._write_config()
# Start Instance
_LOGGER.info("Starting CoreDNS plugin")
@@ -274,7 +273,7 @@ class PluginDns(PluginBase):
else:
self._loop = False
async def _write_config(self) -> None:
def _write_config(self) -> None:
"""Write CoreDNS config."""
debug: bool = self.sys_config.logging == LogLevel.DEBUG
dns_servers: list[str] = []
@@ -298,8 +297,7 @@ class PluginDns(PluginBase):
# Write config to plugin
try:
await self.sys_run_in_executor(
write_json_file,
write_json_file(
self.coredns_config,
{
"servers": dns_servers,
@@ -414,7 +412,7 @@ class PluginDns(PluginBase):
_LOGGER.error("Repair of CoreDNS failed")
await async_capture_exception(err)
async def _write_resolv(self, resolv_conf: Path) -> None:
def _write_resolv(self, resolv_conf: Path) -> None:
"""Update/Write resolv.conf file."""
if not self.resolv_template:
_LOGGER.warning(
@@ -429,7 +427,7 @@ class PluginDns(PluginBase):
# Write config back to resolv
try:
await self.sys_run_in_executor(resolv_conf.write_text, data)
resolv_conf.write_text(data)
except OSError as err:
if err.errno == errno.EBADMSG:
self.sys_resolution.unhealthy = UnhealthyReason.OSERROR_BAD_MESSAGE

View File

@@ -36,9 +36,6 @@ class EvaluateAppArmor(EvaluateBase):
async def evaluate(self) -> None:
"""Run evaluation."""
try:
apparmor = await self.sys_run_in_executor(
_APPARMOR_KERNEL.read_text, encoding="utf-8"
)
return _APPARMOR_KERNEL.read_text(encoding="utf-8").strip().upper() != "Y"
except OSError:
return True
return apparmor.strip().upper() != "Y"

View File

@@ -34,13 +34,7 @@ class EvaluateLxc(EvaluateBase):
async def evaluate(self):
"""Run evaluation."""
def check_lxc():
with suppress(OSError):
if "container=lxc" in Path("/proc/1/environ").read_text(
encoding="utf-8"
):
return True
return Path("/dev/lxd/sock").exists()
return await self.sys_run_in_executor(check_lxc)
with suppress(OSError):
if "container=lxc" in Path("/proc/1/environ").read_text(encoding="utf-8"):
return True
return Path("/dev/lxd/sock").exists()

View File

@@ -39,7 +39,7 @@ class FixupAddonExecuteRepair(FixupBase):
)
return
_LOGGER.info("Installing image for addon %s", reference)
_LOGGER.info("Installing image for addon %s")
self.attempts += 1
await addon.instance.install(addon.version)

View File

@@ -75,6 +75,8 @@ class StoreManager(CoreSysAttributes, FileConfiguration):
async def load(self) -> None:
"""Start up add-on management."""
await self.data.update()
# Init custom repositories and load add-ons
await self.update_repositories(
self._data[ATTR_REPOSITORIES], add_with_errors=True

View File

@@ -97,9 +97,7 @@ class GitRepo(CoreSysAttributes):
}
try:
_LOGGER.info(
"Cloning add-on %s repository from %s", self.path, self.url
)
_LOGGER.info("Cloning add-on %s repository", self.url)
self.repo = await self.sys_run_in_executor(
ft.partial(
git.Repo.clone_from, self.url, str(self.path), **git_args
@@ -130,14 +128,7 @@ class GitRepo(CoreSysAttributes):
return
async with self.lock:
_LOGGER.info("Update add-on %s repository from %s", self.path, self.url)
try:
git_cmd = git.Git()
await self.sys_run_in_executor(git_cmd.ls_remote, "--heads", self.url)
except git.CommandError as err:
_LOGGER.warning("Wasn't able to update %s repo: %s.", self.url, err)
raise StoreGitError() from err
_LOGGER.info("Update add-on %s repository", self.url)
try:
branch = self.repo.active_branch.name

View File

@@ -48,10 +48,7 @@ json_loads = orjson.loads # pylint: disable=no-member
def write_json_file(jsonfile: Path, data: Any) -> None:
"""Write a JSON file.
Must be run in executor.
"""
"""Write a JSON file."""
try:
with atomic_write(jsonfile, overwrite=True) as fp:
fp.write(
@@ -70,10 +67,7 @@ def write_json_file(jsonfile: Path, data: Any) -> None:
def read_json_file(jsonfile: Path) -> Any:
"""Read a JSON file and return a dict.
Must be run in executor.
"""
"""Read a JSON file and return a dict."""
try:
return json_loads(jsonfile.read_bytes())
except (OSError, ValueError, TypeError, UnicodeDecodeError) as err:

View File

@@ -1,8 +1,11 @@
"""Custom log messages."""
import asyncio
import logging
import re
from .sentry import async_capture_exception
_LOGGER: logging.Logger = logging.getLogger(__name__)
RE_BIND_FAILED = re.compile(
@@ -10,12 +13,17 @@ RE_BIND_FAILED = re.compile(
)
def format_message(message: str) -> str:
"""Return a formatted message if it's known."""
match = RE_BIND_FAILED.match(message)
if match:
return (
f"Port '{match.group(1)}' is already in use by something else on the host."
)
def async_format_message(message: str) -> str:
"""Return a formated message if it's known.
Must be called from event loop.
"""
try:
match = RE_BIND_FAILED.match(message)
if match:
return f"Port '{match.group(1)}' is already in use by something else on the host."
except TypeError as err:
_LOGGER.error("The type of message is not a string - %s", err)
asyncio.get_running_loop().create_task(async_capture_exception(err))
return message

View File

@@ -5,7 +5,6 @@ from functools import partial
import logging
from typing import Any
from aiohttp.web_exceptions import HTTPBadGateway, HTTPServiceUnavailable
import sentry_sdk
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.asyncio import AsyncioIntegration
@@ -34,15 +33,7 @@ def init_sentry(coresys: CoreSys) -> None:
auto_enabling_integrations=False,
default_integrations=False,
integrations=[
AioHttpIntegration(
failed_request_status_codes=frozenset(range(500, 600))
- set(
{
HTTPBadGateway.status_code,
HTTPServiceUnavailable.status_code,
}
)
),
AioHttpIntegration(),
AsyncioIntegration(),
ExcepthookIntegration(),
DedupeIntegration(),

View File

@@ -20,7 +20,6 @@ from supervisor.docker.addon import DockerAddon
from supervisor.docker.const import ContainerState
from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.exceptions import AddonsError, AddonsJobError, AudioUpdateError
from supervisor.hardware.helper import HwHelper
from supervisor.ingress import Ingress
from supervisor.store.repository import Repository
from supervisor.utils.dt import utcnow
@@ -251,7 +250,11 @@ async def test_watchdog_during_attach(
with (
patch.object(Addon, "restart") as restart,
patch.object(HwHelper, "last_boot", return_value=utcnow()),
patch.object(
type(coresys.hardware.helper),
"last_boot",
new=PropertyMock(return_value=utcnow()),
),
patch.object(DockerAddon, "attach"),
patch.object(
DockerAddon,
@@ -259,9 +262,7 @@ async def test_watchdog_during_attach(
return_value=ContainerState.STOPPED,
),
):
coresys.config.last_boot = (
await coresys.hardware.helper.last_boot() + boot_timedelta
)
coresys.config.last_boot = coresys.hardware.helper.last_boot + boot_timedelta
addon = Addon(coresys, store.slug)
coresys.addons.local[addon.slug] = addon
addon.watchdog = True
@@ -738,7 +739,7 @@ async def test_local_example_ingress_port_set(
assert install_addon_example.ingress_port != 0
async def test_addon_pulse_error(
def test_addon_pulse_error(
coresys: CoreSys,
install_addon_example: Addon,
caplog: pytest.LogCaptureFixture,
@@ -749,14 +750,14 @@ async def test_addon_pulse_error(
"supervisor.addons.addon.Path.write_text", side_effect=(err := OSError())
):
err.errno = errno.EBUSY
await install_addon_example.write_pulse()
install_addon_example.write_pulse()
assert "can't write pulse/client.config" in caplog.text
assert coresys.core.healthy is True
caplog.clear()
err.errno = errno.EBADMSG
await install_addon_example.write_pulse()
install_addon_example.write_pulse()
assert "can't write pulse/client.config" in caplog.text
assert coresys.core.healthy is False

View File

@@ -58,7 +58,7 @@ async def api_token_validation(aiohttp_client, coresys: CoreSys) -> TestClient:
@pytest.mark.asyncio
async def test_api_security_system_initialize(api_system: TestClient, coresys: CoreSys):
"""Test security."""
await coresys.core.set_state(CoreState.INITIALIZE)
coresys.core.state = CoreState.INITIALIZE
resp = await api_system.get("/supervisor/ping")
result = await resp.json()
@@ -69,7 +69,7 @@ async def test_api_security_system_initialize(api_system: TestClient, coresys: C
@pytest.mark.asyncio
async def test_api_security_system_setup(api_system: TestClient, coresys: CoreSys):
"""Test security."""
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
resp = await api_system.get("/supervisor/ping")
result = await resp.json()
@@ -80,7 +80,7 @@ async def test_api_security_system_setup(api_system: TestClient, coresys: CoreSy
@pytest.mark.asyncio
async def test_api_security_system_running(api_system: TestClient, coresys: CoreSys):
"""Test security."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
resp = await api_system.get("/supervisor/ping")
assert resp.status == 200
@@ -89,7 +89,7 @@ async def test_api_security_system_running(api_system: TestClient, coresys: Core
@pytest.mark.asyncio
async def test_api_security_system_startup(api_system: TestClient, coresys: CoreSys):
"""Test security."""
await coresys.core.set_state(CoreState.STARTUP)
coresys.core.state = CoreState.STARTUP
resp = await api_system.get("/supervisor/ping")
assert resp.status == 200

View File

@@ -137,7 +137,7 @@ async def test_backup_to_location(
await coresys.mounts.create_mount(mount)
coresys.mounts.default_backup_mount = mount
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post(
"/backups/new/full",
@@ -178,7 +178,7 @@ async def test_backup_to_default(api_client: TestClient, coresys: CoreSys):
await coresys.mounts.create_mount(mount)
coresys.mounts.default_backup_mount = mount
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post(
"/backups/new/full",
@@ -196,7 +196,7 @@ async def test_api_freeze_thaw(
api_client: TestClient, coresys: CoreSys, ha_ws_client: AsyncMock
):
"""Test manual freeze and thaw for external backup via API."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
ha_ws_client.ha_version = AwesomeVersion("2022.1.0")
@@ -230,7 +230,7 @@ async def test_api_backup_exclude_database(
exclude_db_setting: bool,
):
"""Test backups exclude the database when specified."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.version = AwesomeVersion("2023.09.0")
coresys.homeassistant.backups_exclude_database = exclude_db_setting
@@ -278,7 +278,7 @@ async def test_api_backup_restore_background(
tmp_supervisor_data: Path,
):
"""Test background option on backup/restore APIs."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.version = AwesomeVersion("2023.09.0")
(tmp_supervisor_data / "addons/local").mkdir(parents=True)
@@ -364,7 +364,7 @@ async def test_api_backup_errors(
tmp_supervisor_data: Path,
):
"""Test error reporting in backup job."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.version = AwesomeVersion("2023.09.0")
(tmp_supervisor_data / "addons/local").mkdir(parents=True)
@@ -435,7 +435,7 @@ async def test_api_backup_errors(
async def test_backup_immediate_errors(api_client: TestClient, coresys: CoreSys):
"""Test backup errors that return immediately even in background mode."""
await coresys.core.set_state(CoreState.FREEZE)
coresys.core.state = CoreState.FREEZE
resp = await api_client.post(
"/backups/new/full",
json={"name": "Test", "background": True},
@@ -443,7 +443,7 @@ async def test_backup_immediate_errors(api_client: TestClient, coresys: CoreSys)
assert resp.status == 400
assert "freeze" in (await resp.json())["message"]
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 0.5
resp = await api_client.post(
"/backups/new/partial",
@@ -460,7 +460,7 @@ async def test_restore_immediate_errors(
mock_partial_backup: Backup,
):
"""Test restore errors that return immediately even in background mode."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post(
@@ -634,7 +634,7 @@ async def test_backup_to_multiple_locations(
inputs: dict[str, Any],
):
"""Test making a backup to multiple locations."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post(
@@ -669,7 +669,7 @@ async def test_backup_with_extras(
inputs: dict[str, Any],
):
"""Test backup including extra metdata."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post(
@@ -909,7 +909,7 @@ async def test_partial_backup_all_addons(
install_addon_ssh: Addon,
):
"""Test backup including extra metdata."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object(Backup, "store_addons") as store_addons:
@@ -928,7 +928,7 @@ async def test_restore_backup_from_location(
local_location: str | None,
):
"""Test restoring a backup from a specific location."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
# Make a backup and a file to test with
@@ -1000,7 +1000,7 @@ async def test_restore_backup_unencrypted_after_encrypted(
# We punt the ball on this one for this PR since this is a rare edge case.
backup.restore_dockerconfig = MagicMock()
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
# Restore encrypted backup
@@ -1050,7 +1050,7 @@ async def test_restore_homeassistant_adds_env(
):
"""Test restoring home assistant from backup adds env to container."""
event = asyncio.Event()
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.version = AwesomeVersion("2025.1.0")
backup = await coresys.backups.do_backup_full()
@@ -1134,7 +1134,7 @@ async def test_protected_backup(
api_client: TestClient, coresys: CoreSys, backup_type: str, options: dict[str, Any]
):
"""Test creating a protected backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post(
@@ -1246,7 +1246,7 @@ async def test_missing_file_removes_location_from_cache(
backup_file: str,
):
"""Test finding a missing file removes the location from cache."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_file = get_fixture_path(backup_file)
@@ -1305,7 +1305,7 @@ async def test_missing_file_removes_backup_from_cache(
backup_file: str,
):
"""Test finding a missing file removes the backup from cache if its the only one."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_file = get_fixture_path(backup_file)
@@ -1331,7 +1331,7 @@ async def test_immediate_list_after_missing_file_restore(
api_client: TestClient, coresys: CoreSys
):
"""Test race with reload for missing file on restore does not error."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_file = get_fixture_path("backup_example.tar")

View File

@@ -128,7 +128,7 @@ async def test_api_resolution_check_options(coresys: CoreSys, api_client: TestCl
@pytest.mark.asyncio
async def test_api_resolution_check_run(coresys: CoreSys, api_client: TestClient):
"""Test client API with run check."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
free_space = coresys.resolution.check.get("free_space")
free_space.run_check = AsyncMock()

View File

@@ -349,41 +349,3 @@ async def test_repository_not_found(api_client: TestClient, method: str, url: st
assert resp.status == 404
body = await resp.json()
assert body["message"] == "Repository bad does not exist in the store"
@pytest.mark.parametrize("resource", ["store/addons", "addons"])
async def test_api_store_addons_documentation_corrupted(
api_client: TestClient, coresys: CoreSys, store_addon: AddonStore, resource: str
):
"""Test /store/addons/{addon}/documentation REST API.
Test add-on with documentation file with byte sequences which cannot be decoded
using UTF-8.
"""
store_addon.path_documentation.write_bytes(b"Text with an invalid UTF-8 char: \xff")
await store_addon.refresh_path_cache()
assert store_addon.with_documentation is True
resp = await api_client.get(f"/{resource}/{store_addon.slug}/documentation")
assert resp.status == 200
result = await resp.text()
assert result == "Text with an invalid UTF-8 char: <20>"
@pytest.mark.parametrize("resource", ["store/addons", "addons"])
async def test_api_store_addons_changelog_corrupted(
api_client: TestClient, coresys: CoreSys, store_addon: AddonStore, resource: str
):
"""Test /store/addons/{addon}/changelog REST API.
Test add-on with changelog file with byte sequences which cannot be decoded
using UTF-8.
"""
store_addon.path_changelog.write_bytes(b"Text with an invalid UTF-8 char: \xff")
await store_addon.refresh_path_cache()
assert store_addon.with_changelog is True
resp = await api_client.get(f"/{resource}/{store_addon.slug}/changelog")
assert resp.status == 200
result = await resp.text()
assert result == "Text with an invalid UTF-8 char: <20>"

View File

@@ -48,7 +48,7 @@ from tests.dbus_service_mocks.systemd_unit import SystemdUnit as SystemdUnitServ
async def test_do_backup_full(coresys: CoreSys, backup_mock, install_addon_ssh):
"""Test creating Backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
@@ -81,7 +81,7 @@ async def test_do_backup_full_with_filename(
coresys: CoreSys, filename: str, filename_expected: str, backup_mock
):
"""Test creating Backup with a specific file name."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
@@ -99,7 +99,7 @@ async def test_do_backup_full_uncompressed(
coresys: CoreSys, backup_mock, install_addon_ssh
):
"""Test creating Backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
@@ -129,7 +129,7 @@ async def test_do_backup_partial_minimal(
coresys: CoreSys, backup_mock, install_addon_ssh
):
"""Test creating minimal partial Backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
@@ -156,7 +156,7 @@ async def test_do_backup_partial_minimal_uncompressed(
coresys: CoreSys, backup_mock, install_addon_ssh
):
"""Test creating minimal partial Backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
@@ -185,7 +185,7 @@ async def test_do_backup_partial_maximal(
coresys: CoreSys, backup_mock, install_addon_ssh
):
"""Test creating maximal partial Backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
@@ -217,7 +217,7 @@ async def test_do_backup_partial_maximal(
async def test_do_restore_full(coresys: CoreSys, full_backup_mock, install_addon_ssh):
"""Test restoring full Backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None)
coresys.homeassistant.core.stop = AsyncMock(return_value=None)
@@ -248,7 +248,7 @@ async def test_do_restore_full_different_addon(
coresys: CoreSys, full_backup_mock, install_addon_ssh
):
"""Test restoring full Backup with different addons than installed."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None)
coresys.homeassistant.core.stop = AsyncMock(return_value=None)
@@ -280,7 +280,7 @@ async def test_do_restore_partial_minimal(
coresys: CoreSys, partial_backup_mock, install_addon_ssh
):
"""Test restoring partial Backup minimal."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None)
coresys.homeassistant.core.stop = AsyncMock(return_value=None)
@@ -303,7 +303,7 @@ async def test_do_restore_partial_minimal(
async def test_do_restore_partial_maximal(coresys: CoreSys, partial_backup_mock):
"""Test restoring partial Backup minimal."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None)
coresys.homeassistant.core.stop = AsyncMock(return_value=None)
@@ -334,7 +334,7 @@ async def test_fail_invalid_full_backup(
coresys: CoreSys, full_backup_mock: MagicMock, partial_backup_mock: MagicMock
):
"""Test restore fails with invalid backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
@@ -366,7 +366,7 @@ async def test_fail_invalid_partial_backup(
coresys: CoreSys, partial_backup_mock: MagicMock
):
"""Test restore fails with invalid backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
@@ -403,7 +403,7 @@ async def test_backup_error(
capture_exception: Mock,
):
"""Test error captured when backup fails."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_mock.return_value.store_folders.side_effect = (err := OSError())
@@ -416,7 +416,7 @@ async def test_restore_error(
coresys: CoreSys, full_backup_mock: MagicMock, capture_exception: Mock
):
"""Test restoring full Backup with errors."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None)
@@ -475,7 +475,7 @@ async def test_backup_media_with_mounts(
assert (mount_dir := coresys.config.path_media / "media_test").is_dir()
# Make a partial backup
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup: Backup = await coresys.backups.do_backup_partial("test", folders=["media"])
@@ -532,7 +532,7 @@ async def test_backup_media_with_mounts_retains_files(
)
# Make a partial backup
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup: Backup = await coresys.backups.do_backup_partial("test", folders=["media"])
@@ -599,7 +599,7 @@ async def test_backup_share_with_mounts(
assert (mount_dir := coresys.config.path_share / "share_test").is_dir()
# Make a partial backup
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup: Backup = await coresys.backups.do_backup_partial("test", folders=["share"])
@@ -646,7 +646,7 @@ async def test_full_backup_to_mount(
assert coresys.backups.backup_locations["backup_test"] == mount_dir
# Make a backup and add it to mounts. Confirm it exists in the right place
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup: Backup = await coresys.backups.do_backup_full("test", location=mount)
assert (mount_dir / f"{backup.slug}.tar").exists()
@@ -692,7 +692,7 @@ async def test_partial_backup_to_mount(
assert coresys.backups.backup_locations["backup_test"] == mount_dir
# Make a backup and add it to mounts. Confirm it exists in the right place
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object(
@@ -746,7 +746,7 @@ async def test_backup_to_down_mount_error(
# Attempt to make a backup which fails because is_mount on directory is false
mock_is_mount.return_value = False
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with pytest.raises(BackupMountDownError):
await coresys.backups.do_backup_full("test", location=mount)
@@ -780,7 +780,7 @@ async def test_backup_to_local_with_default(
coresys.mounts.default_backup_mount = mount
# Make a backup for local. Confirm it exists in the right place
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object(
@@ -820,7 +820,7 @@ async def test_backup_to_default(
coresys.mounts.default_backup_mount = mount
# Make a backup for default. Confirm it exists in the right place
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object(
@@ -861,7 +861,7 @@ async def test_backup_to_default_mount_down_error(
# Attempt to make a backup which fails because is_mount on directory is false
mock_is_mount.return_value = False
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with pytest.raises(BackupMountDownError):
@@ -914,7 +914,7 @@ async def test_backup_with_healthcheck(
container.status = "running"
container.attrs["Config"] = {"Healthcheck": "exists"}
install_addon_ssh.path_data.mkdir()
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
await install_addon_ssh.load()
await asyncio.sleep(0)
@@ -992,7 +992,7 @@ async def test_restore_with_healthcheck(
container.status = "running"
container.attrs["Config"] = {"Healthcheck": "exists"}
install_addon_ssh.path_data.mkdir()
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
await install_addon_ssh.load()
await asyncio.sleep(0)
@@ -1093,7 +1093,7 @@ async def test_backup_progress(
"""Test progress is tracked during backups."""
container.status = "running"
install_addon_ssh.path_data.mkdir()
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with (
@@ -1193,7 +1193,7 @@ async def test_restore_progress(
container.status = "running"
install_addon_ssh.path_data.mkdir()
install_addon_ssh.state = AddonState.STARTED
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
full_backup: Backup = await coresys.backups.do_backup_full()
@@ -1368,7 +1368,7 @@ async def test_freeze_thaw(
"""Test manual freeze and thaw for external snapshots."""
container.status = "running"
install_addon_ssh.path_data.mkdir()
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
container.exec_run.return_value = (0, None)
ha_ws_client.ha_version = AwesomeVersion("2022.1.0")
@@ -1450,7 +1450,7 @@ async def test_freeze_thaw_timeout(
path_extern,
):
"""Test manual freeze ends due to timeout expiration."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
ha_ws_client.ha_version = AwesomeVersion("2022.1.0")
@@ -1474,7 +1474,7 @@ async def test_freeze_thaw_timeout(
async def test_cannot_manually_thaw_normal_freeze(coresys: CoreSys):
"""Test thaw_all cannot be used unless freeze was started by freeze_all method."""
await coresys.core.set_state(CoreState.FREEZE)
coresys.core.state = CoreState.FREEZE
with pytest.raises(BackupError):
await coresys.backups.thaw_all()
@@ -1487,7 +1487,7 @@ async def test_restore_only_reloads_ingress_on_change(
):
"""Test restore only tells core to reload ingress when something has changed."""
install_addon_ssh.path_data.mkdir()
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_no_ingress: Backup = await coresys.backups.do_backup_partial(
@@ -1547,7 +1547,7 @@ async def test_restore_new_addon(
path_extern,
):
"""Test restore installing new addon."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
assert not install_addon_example.path_data.exists()
@@ -1578,7 +1578,7 @@ async def test_restore_preserves_data_config(
path_extern,
):
"""Test restore preserves data and config."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
install_addon_example.path_data.mkdir()
@@ -1616,7 +1616,7 @@ async def test_backup_to_mount_bypasses_free_space_condition(
mock_is_mount,
):
"""Test backing up to a mount bypasses the check on local free space."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda _: 0.1
# These fail due to lack of local free space
@@ -1669,7 +1669,7 @@ async def test_skip_homeassistant_database(
path_extern,
):
"""Test exclude database option skips database in backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.jobs.ignore_conditions = [
JobCondition.INTERNET_HOST,
@@ -1812,7 +1812,7 @@ async def test_monitoring_after_full_restore(
coresys: CoreSys, full_backup_mock, install_addon_ssh, container
):
"""Test monitoring of addon state still works after full restore."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None)
coresys.homeassistant.core.stop = AsyncMock(return_value=None)
@@ -1833,7 +1833,7 @@ async def test_monitoring_after_partial_restore(
coresys: CoreSys, partial_backup_mock, install_addon_ssh, container
):
"""Test monitoring of addon state still works after full restore."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config()
@@ -1866,7 +1866,7 @@ async def test_core_pre_backup_actions_failed(
path_extern,
):
"""Test pre-backup actions failed in HA core stops backup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
ha_ws_client.ha_version = AwesomeVersion("2024.7.0")
ha_ws_client.async_send_command.return_value = {
@@ -2028,7 +2028,7 @@ async def test_backup_remove_one_location_of_multiple(coresys: CoreSys):
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_addon_backup_excludes(coresys: CoreSys, install_addon_example: Addon):
"""Test backup excludes option for addons."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
install_addon_example.path_data.mkdir(parents=True)

View File

@@ -315,7 +315,6 @@ async def coresys(
with (
patch("supervisor.bootstrap.initialize_system"),
patch("supervisor.utils.sentry.sentry_sdk.init"),
patch("supervisor.core.Core._write_run_state"),
):
coresys_obj = await initialize_coresys()
@@ -390,7 +389,7 @@ async def coresys(
async def ha_ws_client(coresys: CoreSys) -> AsyncMock:
"""Return HA WS client mock for assertions."""
# Set Supervisor Core state to RUNNING, otherwise WS events won't be delivered
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
await asyncio.sleep(0)
client = coresys.homeassistant.websocket._client
client.async_send_command.reset_mock()
@@ -502,14 +501,10 @@ def store_manager(coresys: CoreSys):
@pytest.fixture
def run_supervisor_state(request: pytest.FixtureRequest) -> Generator[MagicMock]:
def run_supervisor_state() -> Generator[MagicMock]:
"""Fixture to simulate Supervisor state file in /run/supervisor."""
if getattr(request, "param", "test_file"):
with patch("supervisor.core.RUN_SUPERVISOR_STATE") as mock_run:
yield mock_run
else:
with patch("supervisor.core.Core._write_run_state") as mock_write_state:
yield mock_write_state
with patch("supervisor.core.RUN_SUPERVISOR_STATE") as mock_run:
yield mock_run
@pytest.fixture
@@ -521,7 +516,6 @@ def store_addon(coresys: CoreSys, tmp_path, repository):
coresys.store.data.addons[addon_obj.slug] = SCHEMA_ADDON_SYSTEM(
load_json_fixture("add-on.json")
)
coresys.store.data.addons[addon_obj.slug]["location"] = tmp_path
yield addon_obj

View File

@@ -87,13 +87,13 @@ def test_hide_virtual_device(coresys: CoreSys):
assert coresys.hardware.helper.hide_virtual_device(udev_device)
async def test_last_boot_error(coresys: CoreSys, caplog: LogCaptureFixture):
def test_last_boot_error(coresys: CoreSys, caplog: LogCaptureFixture):
"""Test error reading last boot."""
with patch(
"supervisor.hardware.helper.Path.read_text", side_effect=(err := OSError())
):
err.errno = errno.EBADMSG
assert await coresys.hardware.helper.last_boot() is None
assert coresys.hardware.helper.last_boot is None
assert coresys.core.healthy is True
assert "Can't read stat data" in caplog.text

View File

@@ -49,11 +49,11 @@ async def test_load(
assert coresys.homeassistant.secrets.secrets == {"hello": "world"}
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
await coresys.homeassistant.websocket.async_send_message({"lorem": "ipsum"})
ha_ws_client.async_send_command.assert_not_called()
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
await asyncio.sleep(0)
assert ha_ws_client.async_send_command.call_args_list[0][0][0] == {"lorem": "ipsum"}
@@ -66,21 +66,21 @@ async def test_get_users_none(coresys: CoreSys, ha_ws_client: AsyncMock):
)
async def test_write_pulse_error(coresys: CoreSys, caplog: pytest.LogCaptureFixture):
def test_write_pulse_error(coresys: CoreSys, caplog: pytest.LogCaptureFixture):
"""Test errors writing pulse config."""
with patch(
"supervisor.homeassistant.module.Path.write_text",
side_effect=(err := OSError()),
):
err.errno = errno.EBUSY
await coresys.homeassistant.write_pulse()
coresys.homeassistant.write_pulse()
assert "can't write pulse/client.config" in caplog.text
assert coresys.core.healthy is True
caplog.clear()
err.errno = errno.EBADMSG
await coresys.homeassistant.write_pulse()
coresys.homeassistant.write_pulse()
assert "can't write pulse/client.config" in caplog.text
assert coresys.core.healthy is False

View File

@@ -57,14 +57,14 @@ async def test_send_command_old_core_version(
async def test_send_message_during_startup(coresys: CoreSys, ha_ws_client: AsyncMock):
"""Test websocket messages queue during startup."""
await coresys.homeassistant.websocket.load()
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
await coresys.homeassistant.websocket.async_supervisor_update_event(
"test", {"lorem": "ipsum"}
)
ha_ws_client.async_send_command.assert_not_called()
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
await asyncio.sleep(0)
assert ha_ws_client.async_send_command.call_count == 2

View File

@@ -234,7 +234,7 @@ async def test_host_connectivity_disabled(
"""Test host connectivity check disabled."""
await coresys.host.network.load()
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
await asyncio.sleep(0)
ha_ws_client.async_send_command.reset_mock()

View File

@@ -75,7 +75,7 @@ async def test_internet(
system_result: bool | None,
):
"""Test the internet decorator."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
reset_last_call(Supervisor.check_connectivity)
class TestClass:
@@ -241,10 +241,10 @@ async def test_running(coresys: CoreSys):
test = TestClass(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert await test.execute()
await coresys.core.set_state(CoreState.FREEZE)
coresys.core.state = CoreState.FREEZE
assert not await test.execute()
coresys.jobs.ignore_conditions = [JobCondition.RUNNING]
@@ -272,10 +272,10 @@ async def test_exception_conditions(coresys: CoreSys):
test = TestClass(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert await test.execute()
await coresys.core.set_state(CoreState.FREEZE)
coresys.core.state = CoreState.FREEZE
with pytest.raises(HassioError):
await test.execute()

View File

@@ -107,22 +107,22 @@ def test_is_dev(coresys):
assert filter_data(coresys, SAMPLE_EVENT, {}) is None
async def test_not_started(coresys):
def test_not_started(coresys):
"""Test if supervisor not fully started."""
coresys.config.diagnostics = True
await coresys.core.set_state(CoreState.INITIALIZE)
coresys.core.state = CoreState.INITIALIZE
assert filter_data(coresys, SAMPLE_EVENT, {}) == SAMPLE_EVENT
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
assert filter_data(coresys, SAMPLE_EVENT, {}) == SAMPLE_EVENT
async def test_defaults(coresys):
def test_defaults(coresys):
"""Test event defaults."""
coresys.config.diagnostics = True
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):
filtered = filter_data(coresys, SAMPLE_EVENT, {})
@@ -135,12 +135,12 @@ async def test_defaults(coresys):
assert filtered["user"]["id"] == coresys.machine_id
async def test_sanitize_user_hostname(coresys):
def test_sanitize_user_hostname(coresys):
"""Test user hostname event sanitation."""
event = SAMPLE_EVENT_AIOHTTP_EXTERNAL
coresys.config.diagnostics = True
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):
filtered = filter_data(coresys, event, {})
@@ -154,25 +154,25 @@ async def test_sanitize_user_hostname(coresys):
)
async def test_sanitize_internal(coresys):
def test_sanitize_internal(coresys):
"""Test internal event sanitation."""
event = SAMPLE_EVENT_AIOHTTP_INTERNAL
coresys.config.diagnostics = True
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):
filtered = filter_data(coresys, event, {})
assert filtered == event
async def test_issues_on_report(coresys):
def test_issues_on_report(coresys):
"""Attach issue to report."""
coresys.resolution.create_issue(IssueType.FATAL_ERROR, ContextType.SYSTEM)
coresys.config.diagnostics = True
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):
event = filter_data(coresys, SAMPLE_EVENT, {})
@@ -182,7 +182,7 @@ async def test_issues_on_report(coresys):
assert event["contexts"]["resolution"]["issues"][0]["context"] == ContextType.SYSTEM
async def test_suggestions_on_report(coresys):
def test_suggestions_on_report(coresys):
"""Attach suggestion to report."""
coresys.resolution.create_issue(
@@ -192,7 +192,7 @@ async def test_suggestions_on_report(coresys):
)
coresys.config.diagnostics = True
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):
event = filter_data(coresys, SAMPLE_EVENT, {})
@@ -210,11 +210,11 @@ async def test_suggestions_on_report(coresys):
)
async def test_unhealthy_on_report(coresys):
def test_unhealthy_on_report(coresys):
"""Attach unhealthy to report."""
coresys.config.diagnostics = True
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.resolution.unhealthy = UnhealthyReason.DOCKER
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):
@@ -224,11 +224,11 @@ async def test_unhealthy_on_report(coresys):
assert event["contexts"]["resolution"]["unhealthy"][-1] == UnhealthyReason.DOCKER
async def test_images_report(coresys):
def test_images_report(coresys):
"""Attach image to report."""
coresys.config.diagnostics = True
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.resolution.evaluate.cached_images.add("my/test:image")
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):

View File

@@ -7,7 +7,7 @@ from supervisor.const import CoreState
async def test_simple_task(coresys):
"""Schedule a simple task."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
trigger = []
async def test_task():
@@ -22,7 +22,7 @@ async def test_simple_task(coresys):
async def test_simple_task_repeat(coresys):
"""Schedule a simple task and repeat."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
trigger = []
async def test_task():
@@ -41,7 +41,7 @@ async def test_simple_task_repeat(coresys):
async def test_simple_task_shutdown(coresys):
"""Schedule a simple task with shudown."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
trigger = []
async def test_task():
@@ -62,7 +62,7 @@ async def test_simple_task_shutdown(coresys):
async def test_simple_task_repeat_block(coresys):
"""Schedule a simple task with repeat and block."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
trigger = []
async def test_task():

View File

@@ -177,7 +177,7 @@ async def test_reload_updater_triggers_supervisor_update(
):
"""Test an updater reload triggers a supervisor update if there is one."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.security.content_trust = False
version_data = load_fixture("version_stable.json")
@@ -218,7 +218,7 @@ async def test_core_backup_cleanup(
tasks: Tasks, coresys: CoreSys, tmp_supervisor_data: Path
):
"""Test core backup task cleans up old backup files."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
# Put an old and new backup in folder

View File

@@ -334,7 +334,7 @@ async def test_multiple_datadisk_add_remove_signals(
]
await coresys.os.datadisk.load()
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert coresys.resolution.issues == []
assert coresys.resolution.suggestions == []
@@ -386,7 +386,7 @@ async def test_disabled_datadisk_add_remove_signals(
]
await coresys.os.datadisk.load()
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert coresys.resolution.issues == []
assert coresys.resolution.suggestions == []

View File

@@ -67,7 +67,7 @@ def test_ota_url_os_name_rel_5_downgrade(coresys: CoreSys) -> None:
async def test_update_fails_if_out_of_date(coresys: CoreSys) -> None:
"""Test update of OS fails if Supervisor is out of date."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
with (
patch.object(
type(coresys.supervisor), "need_update", new=PropertyMock(return_value=True)

View File

@@ -31,7 +31,7 @@ def fixture_mock_dns_query():
async def test_check_setup(coresys: CoreSys):
"""Test check for setup."""
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
with patch(
"supervisor.resolution.checks.free_space.CheckFreeSpace.run_check",
return_value=False,
@@ -42,7 +42,7 @@ async def test_check_setup(coresys: CoreSys):
async def test_check_running(coresys: CoreSys):
"""Test check for setup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
with patch(
"supervisor.resolution.checks.free_space.CheckFreeSpace.run_check",
return_value=False,
@@ -54,7 +54,7 @@ async def test_check_running(coresys: CoreSys):
async def test_if_check_make_issue(coresys: CoreSys):
"""Test check for setup."""
free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.security.content_trust = False
with patch("shutil.disk_usage", return_value=(1, 1, 1)):
@@ -66,7 +66,7 @@ async def test_if_check_make_issue(coresys: CoreSys):
async def test_if_check_cleanup_issue(coresys: CoreSys):
"""Test check for setup."""
free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.security.content_trust = False
with patch("shutil.disk_usage", return_value=(1, 1, 1)):
@@ -82,7 +82,7 @@ async def test_if_check_cleanup_issue(coresys: CoreSys):
async def test_enable_disable_checks(coresys: CoreSys):
"""Test enable and disable check."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
free_space = coresys.resolution.check.get("free_space")
# Ensure the check was enabled

View File

@@ -29,7 +29,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys):
"""Test check."""
addon_pwned = CheckAddonPwned(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
addon = TestAddon()
coresys.addons.local[addon.slug] = addon
@@ -61,7 +61,7 @@ async def test_check(coresys: CoreSys):
async def test_approve(coresys: CoreSys):
"""Test check."""
addon_pwned = CheckAddonPwned(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
addon = TestAddon()
coresys.addons.local[addon.slug] = addon
@@ -82,7 +82,7 @@ async def test_with_global_disable(coresys: CoreSys, caplog):
"""Test when pwned is globally disabled."""
coresys.security.pwned = False
addon_pwned = CheckAddonPwned(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
addon = TestAddon()
coresys.addons.local[addon.slug] = addon
@@ -107,13 +107,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as check:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await addon_pwned()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await addon_pwned()
check.assert_not_called()
check.reset_mock()

View File

@@ -22,7 +22,7 @@ async def test_base(coresys: CoreSys):
async def test_check_no_backups(coresys: CoreSys):
"""Test check creates issue with no backups."""
backups = CheckBackups(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
await backups.run_check()
@@ -35,7 +35,7 @@ async def test_check_only_partial_backups(
):
"""Test check creates issue with only partial backups."""
backups = CheckBackups(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
await backups.run_check()
@@ -46,7 +46,7 @@ async def test_check_only_partial_backups(
async def test_check_with_backup(coresys: CoreSys, mock_full_backup: Backup):
"""Test check only creates issue if full backup not current."""
backups = CheckBackups(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
await backups.run_check()
@@ -72,13 +72,13 @@ async def test_did_run(coresys: CoreSys):
return_value=False,
) as check:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await backups()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await backups()
check.assert_not_called()
check.reset_mock()

View File

@@ -23,7 +23,7 @@ async def test_check(coresys: CoreSys, tmp_path):
"""Test check."""
with patch("supervisor.config.CoreConfig.path_homeassistant", tmp_path):
core_security = CheckCoreSecurity(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
@@ -57,7 +57,7 @@ async def test_approve(coresys: CoreSys, tmp_path):
"""Test check."""
with patch("supervisor.config.CoreConfig.path_homeassistant", tmp_path):
core_security = CheckCoreSecurity(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.homeassistant._data["version"] = None
assert await core_security.approve_check()
@@ -84,13 +84,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as check:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await core_security()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await core_security()
check.assert_not_called()
check.reset_mock()

View File

@@ -21,7 +21,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys, install_addon_ssh: Addon):
"""Test check for detached addons."""
detached_addon_missing = CheckDetachedAddonMissing(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
await detached_addon_missing()
assert len(coresys.resolution.issues) == 0
@@ -44,7 +44,7 @@ async def test_check(coresys: CoreSys, install_addon_ssh: Addon):
async def test_approve(coresys: CoreSys, install_addon_ssh: Addon):
"""Test approve existing detached addon issues."""
detached_addon_missing = CheckDetachedAddonMissing(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
assert (
await detached_addon_missing.approve_check(reference=install_addon_ssh.slug)
@@ -75,13 +75,13 @@ async def test_did_run(coresys: CoreSys):
CheckDetachedAddonMissing, "run_check", return_value=None
) as check:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await detached_addon_missing()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await detached_addon_missing()
check.assert_not_called()
check.reset_mock()

View File

@@ -25,7 +25,7 @@ async def test_check(
):
"""Test check for detached addons."""
detached_addon_removed = CheckDetachedAddonRemoved(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
await detached_addon_removed()
assert len(coresys.resolution.issues) == 0
@@ -55,7 +55,7 @@ async def test_approve(
):
"""Test approve existing detached addon issues."""
detached_addon_removed = CheckDetachedAddonRemoved(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
assert (
await detached_addon_removed.approve_check(reference=install_addon_ssh.slug)
@@ -86,13 +86,13 @@ async def test_did_run(coresys: CoreSys):
CheckDetachedAddonRemoved, "run_check", return_value=None
) as check:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await detached_addon_removed()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await detached_addon_removed()
check.assert_not_called()
check.reset_mock()

View File

@@ -36,7 +36,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys, sda1_block_service: BlockService):
"""Test check."""
disabled_data_disk = CheckDisabledDataDisk(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
await disabled_data_disk.run_check()
@@ -64,7 +64,7 @@ async def test_check(coresys: CoreSys, sda1_block_service: BlockService):
async def test_approve(coresys: CoreSys, sda1_block_service: BlockService):
"""Test approve."""
disabled_data_disk = CheckDisabledDataDisk(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert not await disabled_data_disk.approve_check(reference="/dev/sda1")
@@ -88,13 +88,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as check:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await disabled_data_disk()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await disabled_data_disk()
check.assert_not_called()
check.reset_mock()

View File

@@ -31,7 +31,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys, dns_query: AsyncMock, capture_exception: Mock):
"""Test check for DNS server failures."""
dns_server = CheckDNSServer(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.plugins.dns.servers = ["dns://1.1.1.1"]
assert dns_server.dns_servers == [
@@ -65,7 +65,7 @@ async def test_check(coresys: CoreSys, dns_query: AsyncMock, capture_exception:
async def test_approve(coresys: CoreSys, dns_query: AsyncMock):
"""Test approve existing DNS Server failure issues."""
dns_server = CheckDNSServer(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert dns_server.dns_servers == ["dns://192.168.30.1"]
dns_query.side_effect = DNSError()
@@ -92,13 +92,13 @@ async def test_did_run(coresys: CoreSys):
with patch.object(CheckDNSServer, "run_check", return_value=None) as check:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await dns_server()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await dns_server()
check.assert_not_called()
check.reset_mock()
@@ -107,7 +107,7 @@ async def test_did_run(coresys: CoreSys):
async def test_check_if_affected(coresys: CoreSys):
"""Test that check is still executed even if already affected."""
dns_server = CheckDNSServer(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.resolution.create_issue(
IssueType.DNS_SERVER_FAILED,

View File

@@ -31,7 +31,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys, dns_query: AsyncMock, capture_exception: Mock):
"""Test check for DNS server IPv6 errors."""
dns_server_ipv6 = CheckDNSServerIPv6(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.plugins.dns.servers = ["dns://1.1.1.1"]
assert dns_server_ipv6.dns_servers == [
@@ -71,7 +71,7 @@ async def test_check(coresys: CoreSys, dns_query: AsyncMock, capture_exception:
async def test_approve(coresys: CoreSys, dns_query: AsyncMock):
"""Test approve existing DNS Server IPv6 error issues."""
dns_server_ipv6 = CheckDNSServerIPv6(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert dns_server_ipv6.dns_servers == ["dns://192.168.30.1"]
dns_query.side_effect = DNSError(4, "Domain name not found")
@@ -103,13 +103,13 @@ async def test_did_run(coresys: CoreSys):
with patch.object(CheckDNSServerIPv6, "run_check", return_value=None) as check:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await dns_server_ipv6()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await dns_server_ipv6()
check.assert_not_called()
check.reset_mock()
@@ -118,7 +118,7 @@ async def test_did_run(coresys: CoreSys):
async def test_check_if_affected(coresys: CoreSys):
"""Test that check is still executed even if already affected."""
dns_server_ipv6 = CheckDNSServerIPv6(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.resolution.create_issue(
IssueType.DNS_SERVER_IPV6_ERROR,

View File

@@ -58,7 +58,7 @@ async def test_check(
await coresys.addons.load()
docker_config = CheckDockerConfig(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert not coresys.resolution.issues
assert not coresys.resolution.suggestions
@@ -131,13 +131,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as check:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await docker_config()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await docker_config()
check.assert_not_called()
check.reset_mock()

View File

@@ -45,7 +45,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys, suggestion: SuggestionType | None):
"""Test check."""
free_space = CheckFreeSpace(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
@@ -68,7 +68,7 @@ async def test_check(coresys: CoreSys, suggestion: SuggestionType | None):
async def test_approve(coresys: CoreSys):
"""Test check."""
free_space = CheckFreeSpace(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
with patch("shutil.disk_usage", return_value=(1, 1, 1)):
assert await free_space.approve_check()
@@ -90,13 +90,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as check:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await free_space()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await free_space()
check.assert_not_called()
check.reset_mock()

View File

@@ -36,7 +36,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys, sda1_block_service: BlockService):
"""Test check."""
multiple_data_disks = CheckMultipleDataDisks(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
await multiple_data_disks.run_check()
@@ -64,7 +64,7 @@ async def test_check(coresys: CoreSys, sda1_block_service: BlockService):
async def test_approve(coresys: CoreSys, sda1_block_service: BlockService):
"""Test approve."""
multiple_data_disks = CheckMultipleDataDisks(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert not await multiple_data_disks.approve_check(reference="/dev/sda1")
@@ -88,13 +88,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as check:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await multiple_data_disks()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await multiple_data_disks()
check.assert_not_called()
check.reset_mock()

View File

@@ -36,7 +36,7 @@ async def test_check(
):
"""Test check."""
network_interface = CheckNetworkInterfaceIPV4(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
@@ -65,7 +65,7 @@ async def test_approve(
):
"""Test check."""
network_interface = CheckNetworkInterfaceIPV4(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert not await network_interface.approve_check("eth0")
@@ -89,13 +89,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as check:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await network_interface()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await network_interface()
check.assert_not_called()
check.reset_mock()

View File

@@ -20,7 +20,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys):
"""Test check."""
supervisor_trust = CheckSupervisorTrust(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
@@ -47,7 +47,7 @@ async def test_check(coresys: CoreSys):
async def test_approve(coresys: CoreSys):
"""Test check."""
supervisor_trust = CheckSupervisorTrust(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
assert await supervisor_trust.approve_check()
@@ -60,7 +60,7 @@ async def test_with_global_disable(coresys: CoreSys, caplog):
"""Test when pwned is globally disabled."""
coresys.security.content_trust = False
supervisor_trust = CheckSupervisorTrust(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
@@ -84,13 +84,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as check:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await supervisor_trust()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await supervisor_trust()
check.assert_not_called()
check.reset_mock()

View File

@@ -12,7 +12,7 @@ from supervisor.resolution.evaluations.connectivity_check import (
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
connectivity_check = EvaluateConnectivityCheck(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert connectivity_check.reason not in coresys.resolution.unsupported
@@ -46,13 +46,13 @@ async def test_did_run(coresys: CoreSys):
return_value=False,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await connectivity_check()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await connectivity_check()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -12,7 +12,7 @@ from supervisor.resolution.evaluations.apparmor import EvaluateAppArmor
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
apparmor = EvaluateAppArmor(coresys)
await coresys.core.set_state(CoreState.INITIALIZE)
coresys.core.state = CoreState.INITIALIZE
assert apparmor.reason not in coresys.resolution.unsupported
@@ -42,13 +42,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await apparmor()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await apparmor()
evaluate.assert_not_called()
evaluate.reset_mock()
@@ -57,7 +57,7 @@ async def test_did_run(coresys: CoreSys):
async def test_evaluation_error(coresys: CoreSys):
"""Test error reading file during evaluation."""
apparmor = EvaluateAppArmor(coresys)
await coresys.core.set_state(CoreState.INITIALIZE)
coresys.core.state = CoreState.INITIALIZE
assert apparmor.reason not in coresys.resolution.unsupported

View File

@@ -15,7 +15,7 @@ from supervisor.resolution.evaluations.cgroup import (
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
cgroup_version = EvaluateCGroupVersion(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
assert cgroup_version.reason not in coresys.resolution.unsupported
@@ -37,7 +37,7 @@ async def test_evaluation(coresys: CoreSys):
async def test_evaluation_os_available(coresys: CoreSys, os_available):
"""Test evaluation with OS available."""
cgroup_version = EvaluateCGroupVersion(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
coresys.docker.info.cgroup = CGROUP_V2_VERSION
await cgroup_version()
@@ -61,13 +61,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await cgroup_version()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await cgroup_version()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -25,7 +25,7 @@ def _make_image_attr(image: str) -> MagicMock:
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
container = EvaluateContainer(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert container.reason not in coresys.resolution.unsupported
assert UnhealthyReason.DOCKER not in coresys.resolution.unhealthy
@@ -57,7 +57,7 @@ async def test_evaluation(coresys: CoreSys):
async def test_corrupt_docker(coresys: CoreSys):
"""Test corrupt docker issue."""
container = EvaluateContainer(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
corrupt_docker = Issue(IssueType.CORRUPT_DOCKER, ContextType.SYSTEM)
assert corrupt_docker not in coresys.resolution.issues
@@ -80,13 +80,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await container()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await container()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -11,7 +11,7 @@ from supervisor.resolution.evaluations.content_trust import EvaluateContentTrust
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
job_conditions = EvaluateContentTrust(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
await job_conditions()
assert job_conditions.reason not in coresys.resolution.unsupported
@@ -34,13 +34,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await job_conditions()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await job_conditions()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -11,7 +11,7 @@ from supervisor.resolution.evaluations.dbus import EvaluateDbus
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
dbus = EvaluateDbus(coresys)
await coresys.core.set_state(CoreState.INITIALIZE)
coresys.core.state = CoreState.INITIALIZE
assert dbus.reason not in coresys.resolution.unsupported
@@ -37,13 +37,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await dbus()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await dbus()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -11,7 +11,7 @@ from supervisor.resolution.evaluations.dns_server import EvaluateDNSServer
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
dns_server = EvaluateDNSServer(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert dns_server.reason not in coresys.resolution.unsupported
assert coresys.plugins.dns.fallback is True
@@ -48,13 +48,13 @@ async def test_did_run(coresys: CoreSys):
with patch.object(EvaluateDNSServer, "evaluate", return_value=None) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await dns_server()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await dns_server()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -15,7 +15,7 @@ from supervisor.resolution.evaluations.docker_configuration import (
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
docker_configuration = EvaluateDockerConfiguration(coresys)
await coresys.core.set_state(CoreState.INITIALIZE)
coresys.core.state = CoreState.INITIALIZE
assert docker_configuration.reason not in coresys.resolution.unsupported
@@ -50,13 +50,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await docker_configuration()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await docker_configuration()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -11,7 +11,7 @@ from supervisor.resolution.evaluations.docker_version import EvaluateDockerVersi
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
docker_version = EvaluateDockerVersion(coresys)
await coresys.core.set_state(CoreState.INITIALIZE)
coresys.core.state = CoreState.INITIALIZE
assert docker_version.reason not in coresys.resolution.unsupported
@@ -37,13 +37,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await docker_version()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await docker_version()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -12,7 +12,7 @@ from supervisor.resolution.evaluations.job_conditions import EvaluateJobConditio
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
job_conditions = EvaluateJobConditions(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
await job_conditions()
assert job_conditions.reason not in coresys.resolution.unsupported
@@ -35,13 +35,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await job_conditions()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await job_conditions()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -12,7 +12,7 @@ from supervisor.resolution.evaluations.lxc import EvaluateLxc
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
lxc = EvaluateLxc(coresys)
await coresys.core.set_state(CoreState.INITIALIZE)
coresys.core.state = CoreState.INITIALIZE
assert lxc.reason not in coresys.resolution.unsupported
@@ -40,13 +40,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await lxc()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await lxc()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -11,7 +11,7 @@ from supervisor.resolution.evaluations.network_manager import EvaluateNetworkMan
async def test_evaluation(coresys: CoreSys, dbus_is_connected):
"""Test evaluation."""
network_manager = EvaluateNetworkManager(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert network_manager.reason not in coresys.resolution.unsupported
@@ -39,13 +39,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await network_manager()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await network_manager()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -14,7 +14,7 @@ from supervisor.resolution.evaluations.operating_system import (
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
operating_system = EvaluateOperatingSystem(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
assert operating_system.reason not in coresys.resolution.unsupported
@@ -45,13 +45,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await operating_system()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await operating_system()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -12,7 +12,7 @@ from supervisor.resolution.evaluations.os_agent import EvaluateOSAgent
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
agent = EvaluateOSAgent(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
assert agent.reason not in coresys.resolution.unsupported
@@ -42,13 +42,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await agent()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await agent()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -11,7 +11,7 @@ from supervisor.resolution.evaluations.privileged import EvaluatePrivileged
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
privileged = EvaluatePrivileged(coresys)
await coresys.core.set_state(CoreState.INITIALIZE)
coresys.core.state = CoreState.INITIALIZE
assert privileged.reason not in coresys.resolution.unsupported
@@ -37,13 +37,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await privileged()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await privileged()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -10,7 +10,7 @@ from supervisor.resolution.evaluations.resolved import EvaluateResolved
async def test_evaluation(coresys: CoreSys, dbus_is_connected):
"""Test evaluation."""
resolved = EvaluateResolved(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
assert resolved.reason not in coresys.resolution.unsupported
@@ -36,13 +36,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await resolved()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await resolved()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -21,7 +21,7 @@ async def test_evaluation(coresys: CoreSys):
Path(f"{os.getcwd()}/supervisor"),
):
sourcemods = EvaluateSourceMods(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert sourcemods.reason not in coresys.resolution.unsupported
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
@@ -50,13 +50,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await sourcemods()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await sourcemods()
evaluate.assert_not_called()
evaluate.reset_mock()
@@ -65,7 +65,7 @@ async def test_did_run(coresys: CoreSys):
async def test_evaluation_error(coresys: CoreSys):
"""Test error reading file during evaluation."""
sourcemods = EvaluateSourceMods(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
corrupt_fs = Issue(IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM)
assert sourcemods.reason not in coresys.resolution.unsupported

View File

@@ -14,7 +14,7 @@ async def test_evaluation(coresys: CoreSys):
need_update_mock = PropertyMock()
with patch.object(type(coresys.supervisor), "need_update", new=need_update_mock):
supervisor_version = EvaluateSupervisorVersion(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
need_update_mock.return_value = False
# Only unsupported if out of date and auto update is off
@@ -41,13 +41,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await supervisor_version()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await supervisor_version()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -12,7 +12,7 @@ from supervisor.resolution.evaluations.systemd import EvaluateSystemd
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
systemd = EvaluateSystemd(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
assert systemd.reason not in coresys.resolution.unsupported
@@ -46,13 +46,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await systemd()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await systemd()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -11,7 +11,7 @@ from supervisor.resolution.evaluations.systemd_journal import EvaluateSystemdJou
async def test_evaluation(coresys: CoreSys, journald_gateway: MagicMock):
"""Test evaluation."""
systemd_journal = EvaluateSystemdJournal(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
assert systemd_journal.reason not in coresys.resolution.unsupported
@@ -38,13 +38,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await systemd_journal()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await systemd_journal()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -10,7 +10,7 @@ from supervisor.resolution.const import UnsupportedReason
async def test_evaluation_initialize(coresys: CoreSys):
"""Test evaluation for initialize."""
await coresys.core.set_state(CoreState.INITIALIZE)
coresys.core.state = CoreState.INITIALIZE
with (
patch(
"supervisor.resolution.evaluations.dbus.EvaluateDbus.evaluate",
@@ -43,7 +43,7 @@ async def test_evaluation_initialize(coresys: CoreSys):
async def test_evaluation_setup(coresys: CoreSys):
"""Test evaluation for setup."""
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
with (
patch(
"supervisor.resolution.evaluations.operating_system.EvaluateOperatingSystem.evaluate",
@@ -66,7 +66,7 @@ async def test_evaluation_setup(coresys: CoreSys):
async def test_evaluation_running(coresys: CoreSys):
"""Test evaluation for running."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
with (
patch(
"supervisor.resolution.evaluations.container.EvaluateContainer.evaluate",
@@ -84,7 +84,7 @@ async def test_evaluation_running(coresys: CoreSys):
async def test_adding_and_removing_unsupported_reason(coresys: CoreSys):
"""Test adding and removing unsupported reason."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert UnsupportedReason.NETWORK_MANAGER not in coresys.resolution.unsupported
with patch(

View File

@@ -17,7 +17,7 @@ TEST_VERSION = AwesomeVersion("1.0.0")
async def test_evaluation(coresys: CoreSys, install_addon_ssh: Addon):
"""Test evaluation."""
restart_policy = EvaluateRestartPolicy(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
await restart_policy()
assert restart_policy.reason not in coresys.resolution.unsupported
@@ -69,13 +69,13 @@ async def test_did_run(coresys: CoreSys):
return_value=False,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await restart_policy()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await restart_policy()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -21,7 +21,7 @@ async def test_evaluation(
"""Test evaluation."""
systemd_service: SystemdService = all_dbus_services["systemd"]
virtualization = EvaluateVirtualizationImage(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
with patch(
"supervisor.os.manager.CPE.get_target_hardware", return_value=["generic-x86-64"]
@@ -53,7 +53,7 @@ async def test_evaluation_supported_images(
"""Test supported images for virtualization do not trigger unsupported."""
systemd_service: SystemdService = all_dbus_services["systemd"]
virtualization = EvaluateVirtualizationImage(coresys)
await coresys.core.set_state(CoreState.SETUP)
coresys.core.state = CoreState.SETUP
with patch("supervisor.os.manager.CPE.get_target_hardware", return_value=[board]):
systemd_service.virtualization = "vmware"
@@ -77,13 +77,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
coresys.core.state = state
await virtualization()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
coresys.core.state = state
await virtualization()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -12,7 +12,7 @@ from supervisor.resolution.validate import get_valid_modules
async def test_check_autofix(coresys: CoreSys):
"""Test check for setup."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
coresys.resolution.fixup._fixups[
"system_create_full_backup"

View File

@@ -10,7 +10,7 @@ from supervisor.utils import check_exception_chain
async def test_check_system_error(coresys: CoreSys, capture_exception: Mock):
"""Test error while checking system."""
await coresys.core.set_state(CoreState.STARTUP)
coresys.core.state = CoreState.STARTUP
with (
patch.object(CheckCoreSecurity, "run_check", side_effect=ValueError),

View File

@@ -9,7 +9,7 @@ from supervisor.utils import check_exception_chain
async def test_evaluate_system_error(coresys: CoreSys, capture_exception: Mock):
"""Test error while evaluating system."""
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
with patch(
"supervisor.resolution.evaluations.source_mods.calc_checksum_path_sourcecode",

View File

@@ -5,7 +5,7 @@ import datetime
import errno
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
import pytest
from pytest import LogCaptureFixture
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
@@ -16,18 +16,15 @@ from supervisor.supervisor import Supervisor
from supervisor.utils.whoami import WhoamiData
@pytest.mark.parametrize("run_supervisor_state", ["test_file"], indirect=True)
async def test_write_state(run_supervisor_state: MagicMock, coresys: CoreSys):
def test_write_state(run_supervisor_state, coresys: CoreSys):
"""Test write corestate to /run/supervisor."""
run_supervisor_state.reset_mock()
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
run_supervisor_state.write_text.assert_called_with(
str(CoreState.RUNNING), encoding="utf-8"
)
await coresys.core.set_state(CoreState.SHUTDOWN)
coresys.core.state = CoreState.SHUTDOWN
run_supervisor_state.write_text.assert_called_with(
str(CoreState.SHUTDOWN), encoding="utf-8"
@@ -90,14 +87,14 @@ async def test_adjust_system_datetime_if_time_behind(coresys: CoreSys):
mock_check_connectivity.assert_called_once()
async def test_write_state_failure(
run_supervisor_state: MagicMock, coresys: CoreSys, caplog: pytest.LogCaptureFixture
def test_write_state_failure(
run_supervisor_state: MagicMock, coresys: CoreSys, caplog: LogCaptureFixture
):
"""Test failure to write corestate to /run/supervisor."""
err = OSError()
err.errno = errno.EBADMSG
run_supervisor_state.write_text.side_effect = err
await coresys.core.set_state(CoreState.RUNNING)
coresys.core.state = CoreState.RUNNING
assert "Can't update the Supervisor state" in caplog.text
assert coresys.core.state == CoreState.RUNNING

View File

@@ -1,13 +1,13 @@
"""Tests for message formater."""
from supervisor.utils.log_format import format_message
from supervisor.utils.log_format import async_format_message
def test_format_message_port():
"""Tests for message formater."""
message = '500 Server Error: Internal Server Error: Bind for 0.0.0.0:80 failed: port is already allocated")'
assert (
format_message(message)
async_format_message(message)
== "Port '80' is already in use by something else on the host."
)
@@ -16,6 +16,12 @@ def test_format_message_port_alternative():
"""Tests for message formater."""
message = 'Error starting userland proxy: listen tcp 0.0.0.0:80: bind: address already in use")'
assert (
format_message(message)
async_format_message(message)
== "Port '80' is already in use by something else on the host."
)
async def test_exeption():
"""Tests the exception handling."""
message = b"byte"
assert async_format_message(message) == message