Compare commits

..

4 Commits

Author SHA1 Message Date
Stefan Agner
febfaf8db1 Make sure Core returns a valid config 2025-11-13 17:49:41 +01:00
Stefan Agner
5d3a568b48 Improve pytest tests 2025-11-13 16:08:09 +01:00
Stefan Agner
894f8ea226 Avoid checking frontend if config data is None 2025-11-13 16:03:25 +01:00
Stefan Agner
96e6c0b15b Check frontend availability after Home Assistant Core updates
Add verification that the frontend is actually accessible at "/" after core
updates to ensure the web interface is serving properly, not just that the
API endpoints respond.

Previously, the update verification only checked API endpoints and whether
the frontend component was loaded. This could miss cases where the API is
responsive but the frontend fails to serve the UI.

Changes:
- Add check_frontend_available() method to HomeAssistantAPI that fetches
  the root path and verifies it returns HTML content
- Integrate frontend check into core update verification flow after
  confirming the frontend component is loaded
- Trigger automatic rollback if frontend is inaccessible after update
- Fix blocking I/O calls in rollback log file handling to use async
  executor

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-13 09:51:38 +01:00
28 changed files with 483 additions and 434 deletions

View File

@@ -53,7 +53,7 @@ jobs:
requirements: ${{ steps.requirements.outputs.changed }}
steps:
- name: Checkout the repository
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
with:
fetch-depth: 0
@@ -92,7 +92,7 @@ jobs:
arch: ${{ fromJson(needs.init.outputs.architectures) }}
steps:
- name: Checkout the repository
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
with:
fetch-depth: 0
@@ -178,7 +178,7 @@ jobs:
steps:
- name: Checkout the repository
if: needs.init.outputs.publish == 'true'
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Initialize git
if: needs.init.outputs.publish == 'true'
@@ -203,7 +203,7 @@ jobs:
timeout-minutes: 60
steps:
- name: Checkout the repository
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
# home-assistant/builder doesn't support sha pinning
- name: Build the Supervisor

View File

@@ -26,7 +26,7 @@ jobs:
name: Prepare Python dependencies
steps:
- name: Check out code from GitHub
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Set up Python
id: python
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
@@ -68,7 +68,7 @@ jobs:
needs: prepare
steps:
- name: Check out code from GitHub
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Set up Python ${{ needs.prepare.outputs.python-version }}
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
id: python
@@ -111,7 +111,7 @@ jobs:
needs: prepare
steps:
- name: Check out code from GitHub
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Set up Python ${{ needs.prepare.outputs.python-version }}
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
id: python
@@ -154,7 +154,7 @@ jobs:
needs: prepare
steps:
- name: Check out code from GitHub
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Register hadolint problem matcher
run: |
echo "::add-matcher::.github/workflows/matchers/hadolint.json"
@@ -169,7 +169,7 @@ jobs:
needs: prepare
steps:
- name: Check out code from GitHub
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Set up Python ${{ needs.prepare.outputs.python-version }}
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
id: python
@@ -213,7 +213,7 @@ jobs:
needs: prepare
steps:
- name: Check out code from GitHub
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Set up Python ${{ needs.prepare.outputs.python-version }}
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
id: python
@@ -257,7 +257,7 @@ jobs:
needs: prepare
steps:
- name: Check out code from GitHub
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Set up Python ${{ needs.prepare.outputs.python-version }}
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
id: python
@@ -293,7 +293,7 @@ jobs:
needs: prepare
steps:
- name: Check out code from GitHub
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Set up Python ${{ needs.prepare.outputs.python-version }}
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
id: python
@@ -339,7 +339,7 @@ jobs:
name: Run tests Python ${{ needs.prepare.outputs.python-version }}
steps:
- name: Check out code from GitHub
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Set up Python ${{ needs.prepare.outputs.python-version }}
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
id: python
@@ -398,7 +398,7 @@ jobs:
needs: ["pytest", "prepare"]
steps:
- name: Check out code from GitHub
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Set up Python ${{ needs.prepare.outputs.python-version }}
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
id: python

View File

@@ -11,7 +11,7 @@ jobs:
name: Release Drafter
steps:
- name: Checkout the repository
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
with:
fetch-depth: 0

View File

@@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Check out code from GitHub
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Sentry Release
uses: getsentry/action-release@128c5058bbbe93c8e02147fe0a9c713f166259a6 # v3.4.0
env:

View File

@@ -14,7 +14,7 @@ jobs:
latest_version: ${{ steps.latest_frontend_version.outputs.latest_tag }}
steps:
- name: Checkout code
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Get latest frontend release
id: latest_frontend_version
uses: abatilo/release-info-action@32cb932219f1cee3fc4f4a298fd65ead5d35b661 # v1.3.3
@@ -49,7 +49,7 @@ jobs:
if: needs.check-version.outputs.skip != 'true'
steps:
- name: Checkout code
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Clear www folder
run: |
rm -rf supervisor/api/panel/*

View File

@@ -25,7 +25,7 @@ pyudev==0.24.4
PyYAML==6.0.3
requests==2.32.5
securetar==2025.2.1
sentry-sdk==2.45.0
sentry-sdk==2.44.0
setuptools==80.9.0
voluptuous==0.15.2
dbus-fast==2.45.1

View File

@@ -1,15 +1,15 @@
astroid==4.0.2
coverage==7.12.0
coverage==7.11.3
mypy==1.18.2
pre-commit==4.4.0
pylint==4.0.3
pylint==4.0.2
pytest-aiohttp==1.1.0
pytest-asyncio==1.3.0
pytest-cov==7.0.0
pytest-timeout==2.4.0
pytest==9.0.1
ruff==0.14.5
time-machine==3.0.0
ruff==0.14.4
time-machine==2.19.0
types-docker==7.1.0.20251009
types-pyyaml==6.0.12.20250915
types-requests==2.32.4.20250913

View File

@@ -152,7 +152,6 @@ class RestAPI(CoreSysAttributes):
self._api_host.advanced_logs,
identifier=syslog_identifier,
latest=True,
no_colors=True,
),
),
web.get(
@@ -450,7 +449,6 @@ class RestAPI(CoreSysAttributes):
await async_capture_exception(err)
kwargs.pop("follow", None) # Follow is not supported for Docker logs
kwargs.pop("latest", None) # Latest is not supported for Docker logs
kwargs.pop("no_colors", None) # no_colors not supported for Docker logs
return await api_supervisor.logs(*args, **kwargs)
self.webapp.add_routes(
@@ -462,7 +460,7 @@ class RestAPI(CoreSysAttributes):
),
web.get(
"/supervisor/logs/latest",
partial(get_supervisor_logs, latest=True, no_colors=True),
partial(get_supervisor_logs, latest=True),
),
web.get("/supervisor/logs/boots/{bootid}", get_supervisor_logs),
web.get(
@@ -578,7 +576,7 @@ class RestAPI(CoreSysAttributes):
),
web.get(
"/addons/{addon}/logs/latest",
partial(get_addon_logs, latest=True, no_colors=True),
partial(get_addon_logs, latest=True),
),
web.get("/addons/{addon}/logs/boots/{bootid}", get_addon_logs),
web.get(

View File

@@ -206,7 +206,6 @@ class APIHost(CoreSysAttributes):
identifier: str | None = None,
follow: bool = False,
latest: bool = False,
no_colors: bool = False,
) -> web.StreamResponse:
"""Return systemd-journald logs."""
log_formatter = LogFormatter.PLAIN
@@ -281,9 +280,7 @@ class APIHost(CoreSysAttributes):
response = web.StreamResponse()
response.content_type = CONTENT_TYPE_TEXT
headers_returned = False
async for cursor, line in journal_logs_reader(
resp, log_formatter, no_colors
):
async for cursor, line in journal_logs_reader(resp, log_formatter):
try:
if not headers_returned:
if cursor:
@@ -321,12 +318,9 @@ class APIHost(CoreSysAttributes):
identifier: str | None = None,
follow: bool = False,
latest: bool = False,
no_colors: bool = False,
) -> web.StreamResponse:
"""Return systemd-journald logs. Wrapped as standard API handler."""
return await self.advanced_logs_handler(
request, identifier, follow, latest, no_colors
)
return await self.advanced_logs_handler(request, identifier, follow, latest)
@api_process
async def disk_usage(self, request: web.Request) -> dict:

View File

@@ -9,7 +9,6 @@ from datetime import UTC, datetime, tzinfo
from functools import partial
import logging
import os
import time
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Self, TypeVar
@@ -656,14 +655,8 @@ class CoreSys:
if kwargs:
funct = partial(funct, **kwargs)
# Convert datetime to event loop time base
# If datetime is in the past, delay will be negative and call_at will
# schedule the call as soon as possible.
delay = when.timestamp() - time.time()
loop_time = self.loop.time() + delay
return self.loop.call_at(
loop_time, funct, *args, context=self._create_context()
when.timestamp(), funct, *args, context=self._create_context()
)

View File

@@ -310,8 +310,6 @@ class DockerInterface(JobGroup, ABC):
if (
stage in {PullImageLayerStage.DOWNLOADING, PullImageLayerStage.EXTRACTING}
and reference.progress_detail
and reference.progress_detail.current is not None
and reference.progress_detail.total is not None
):
job.update(
progress=progress,

View File

@@ -175,7 +175,10 @@ class HomeAssistantAPI(CoreSysAttributes):
async def get_config(self) -> dict[str, Any]:
"""Return Home Assistant config."""
return await self._get_json("api/config")
config = await self._get_json("api/config")
if config is None or not isinstance(config, dict):
raise HomeAssistantAPIError("No config received from Home Assistant API")
return config
async def get_core_state(self) -> dict[str, Any]:
"""Return Home Assistant core state."""
@@ -219,3 +222,36 @@ class HomeAssistantAPI(CoreSysAttributes):
if state := await self.get_api_state():
return state.core_state == "RUNNING" or state.offline_db_migration
return False
async def check_frontend_available(self) -> bool:
"""Check if the frontend is accessible by fetching the root path.
Returns:
True if the frontend responds successfully, False otherwise.
"""
# Skip check on landingpage
if (
self.sys_homeassistant.version is None
or self.sys_homeassistant.version == LANDINGPAGE
):
return False
try:
async with self.make_request("get", "", timeout=30) as resp:
# Frontend should return HTML content
if resp.status == 200:
content_type = resp.headers.get(hdrs.CONTENT_TYPE, "")
if "text/html" in content_type:
_LOGGER.debug("Frontend is accessible and serving HTML")
return True
_LOGGER.warning(
"Frontend responded but with unexpected content type: %s",
content_type,
)
return False
_LOGGER.warning("Frontend returned status %s", resp.status)
return False
except HomeAssistantAPIError as err:
_LOGGER.debug("Cannot reach frontend: %s", err)
return False

View File

@@ -303,12 +303,18 @@ class HomeAssistantCore(JobGroup):
except HomeAssistantError:
# The API stoped responding between the up checks an now
self._error_state = True
data = None
return
# Verify that the frontend is loaded
if data and "frontend" not in data.get("components", []):
if "frontend" not in data.get("components", []):
_LOGGER.error("API responds but frontend is not loaded")
self._error_state = True
# Check that the frontend is actually accessible
elif not await self.sys_homeassistant.api.check_frontend_available():
_LOGGER.error(
"Frontend component loaded but frontend is not accessible"
)
self._error_state = True
else:
return
@@ -321,12 +327,12 @@ class HomeAssistantCore(JobGroup):
# Make a copy of the current log file if it exists
logfile = self.sys_config.path_homeassistant / "home-assistant.log"
if logfile.exists():
if await self.sys_run_in_executor(logfile.exists):
rollback_log = (
self.sys_config.path_homeassistant / "home-assistant-rollback.log"
)
shutil.copy(logfile, rollback_log)
await self.sys_run_in_executor(shutil.copy, logfile, rollback_log)
_LOGGER.info(
"A backup of the logfile is stored in /config/home-assistant-rollback.log"
)

View File

@@ -5,20 +5,12 @@ from collections.abc import AsyncGenerator
from datetime import UTC, datetime
from functools import wraps
import json
import re
from aiohttp import ClientResponse
from supervisor.exceptions import MalformedBinaryEntryError
from supervisor.host.const import LogFormatter
_RE_ANSI_CSI_COLORS_PATTERN = re.compile(r"\x1B\[[0-9;]*m")
def _strip_ansi_colors(message: str) -> str:
"""Remove ANSI color codes from a message string."""
return _RE_ANSI_CSI_COLORS_PATTERN.sub("", message)
def formatter(required_fields: list[str]):
"""Decorate journal entry formatters with list of required fields.
@@ -39,9 +31,9 @@ def formatter(required_fields: list[str]):
@formatter(["MESSAGE"])
def journal_plain_formatter(entries: dict[str, str], no_colors: bool = False) -> str:
def journal_plain_formatter(entries: dict[str, str]) -> str:
"""Format parsed journal entries as a plain message."""
return _strip_ansi_colors(entries["MESSAGE"]) if no_colors else entries["MESSAGE"]
return entries["MESSAGE"]
@formatter(
@@ -53,7 +45,7 @@ def journal_plain_formatter(entries: dict[str, str], no_colors: bool = False) ->
"MESSAGE",
]
)
def journal_verbose_formatter(entries: dict[str, str], no_colors: bool = False) -> str:
def journal_verbose_formatter(entries: dict[str, str]) -> str:
"""Format parsed journal entries to a journalctl-like format."""
ts = datetime.fromtimestamp(
int(entries["__REALTIME_TIMESTAMP"]) / 1e6, UTC
@@ -66,24 +58,14 @@ def journal_verbose_formatter(entries: dict[str, str], no_colors: bool = False)
else entries.get("SYSLOG_IDENTIFIER", "_UNKNOWN_")
)
message = (
_strip_ansi_colors(entries.get("MESSAGE", ""))
if no_colors
else entries.get("MESSAGE", "")
)
return f"{ts} {entries.get('_HOSTNAME', '')} {identifier}: {message}"
return f"{ts} {entries.get('_HOSTNAME', '')} {identifier}: {entries.get('MESSAGE', '')}"
async def journal_logs_reader(
journal_logs: ClientResponse,
log_formatter: LogFormatter = LogFormatter.PLAIN,
no_colors: bool = False,
journal_logs: ClientResponse, log_formatter: LogFormatter = LogFormatter.PLAIN
) -> AsyncGenerator[tuple[str | None, str]]:
"""Read logs from systemd journal line by line, formatted using the given formatter.
Optionally strip ANSI color codes from the entries' messages.
Returns a generator of (cursor, formatted_entry) tuples.
"""
match log_formatter:
@@ -102,10 +84,7 @@ async def journal_logs_reader(
# at EOF (likely race between at_eof and EOF check in readuntil)
if line == b"\n" or not line:
if entries:
yield (
entries.get("__CURSOR"),
formatter_(entries, no_colors=no_colors),
)
yield entries.get("__CURSOR"), formatter_(entries)
entries = {}
continue

View File

@@ -1 +1,95 @@
"""Test for API calls."""
from unittest.mock import AsyncMock, MagicMock
from aiohttp.test_utils import TestClient
from supervisor.coresys import CoreSys
from supervisor.host.const import LogFormat
DEFAULT_LOG_RANGE = "entries=:-99:100"
DEFAULT_LOG_RANGE_FOLLOW = "entries=:-99:18446744073709551615"
async def common_test_api_advanced_logs(
path_prefix: str,
syslog_identifier: str,
api_client: TestClient,
journald_logs: MagicMock,
coresys: CoreSys,
os_available: None,
):
"""Template for tests of endpoints using advanced logs."""
resp = await api_client.get(f"{path_prefix}/logs")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": syslog_identifier},
range_header=DEFAULT_LOG_RANGE,
accept=LogFormat.JOURNAL,
)
journald_logs.reset_mock()
resp = await api_client.get(f"{path_prefix}/logs/follow")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": syslog_identifier, "follow": ""},
range_header=DEFAULT_LOG_RANGE_FOLLOW,
accept=LogFormat.JOURNAL,
)
journald_logs.reset_mock()
mock_response = MagicMock()
mock_response.text = AsyncMock(
return_value='{"CONTAINER_LOG_EPOCH": "12345"}\n{"CONTAINER_LOG_EPOCH": "12345"}\n'
)
journald_logs.return_value.__aenter__.return_value = mock_response
resp = await api_client.get(f"{path_prefix}/logs/latest")
assert resp.status == 200
assert journald_logs.call_count == 2
# Check the first call for getting epoch
epoch_call = journald_logs.call_args_list[0]
assert epoch_call[1]["params"] == {"CONTAINER_NAME": syslog_identifier}
assert epoch_call[1]["range_header"] == "entries=:-1:2"
# Check the second call for getting logs with the epoch
logs_call = journald_logs.call_args_list[1]
assert logs_call[1]["params"]["SYSLOG_IDENTIFIER"] == syslog_identifier
assert logs_call[1]["params"]["CONTAINER_LOG_EPOCH"] == "12345"
assert logs_call[1]["range_header"] == "entries=:0:18446744073709551615"
journald_logs.reset_mock()
resp = await api_client.get(f"{path_prefix}/logs/boots/0")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": syslog_identifier, "_BOOT_ID": "ccc"},
range_header=DEFAULT_LOG_RANGE,
accept=LogFormat.JOURNAL,
)
journald_logs.reset_mock()
resp = await api_client.get(f"{path_prefix}/logs/boots/0/follow")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={
"SYSLOG_IDENTIFIER": syslog_identifier,
"_BOOT_ID": "ccc",
"follow": "",
},
range_header=DEFAULT_LOG_RANGE_FOLLOW,
accept=LogFormat.JOURNAL,
)

View File

@@ -1,133 +0,0 @@
"""Fixtures for API tests."""
from collections.abc import Awaitable, Callable
from unittest.mock import ANY, AsyncMock, MagicMock
from aiohttp.test_utils import TestClient
import pytest
from supervisor.coresys import CoreSys
from supervisor.host.const import LogFormat, LogFormatter
DEFAULT_LOG_RANGE = "entries=:-99:100"
DEFAULT_LOG_RANGE_FOLLOW = "entries=:-99:18446744073709551615"
async def _common_test_api_advanced_logs(
path_prefix: str,
syslog_identifier: str,
api_client: TestClient,
journald_logs: MagicMock,
coresys: CoreSys,
os_available: None,
journal_logs_reader: MagicMock,
):
"""Template for tests of endpoints using advanced logs."""
resp = await api_client.get(f"{path_prefix}/logs")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": syslog_identifier},
range_header=DEFAULT_LOG_RANGE,
accept=LogFormat.JOURNAL,
)
journald_logs.reset_mock()
resp = await api_client.get(f"{path_prefix}/logs/follow")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": syslog_identifier, "follow": ""},
range_header=DEFAULT_LOG_RANGE_FOLLOW,
accept=LogFormat.JOURNAL,
)
journal_logs_reader.assert_called_with(ANY, LogFormatter.PLAIN, False)
journald_logs.reset_mock()
journal_logs_reader.reset_mock()
mock_response = MagicMock()
mock_response.text = AsyncMock(
return_value='{"CONTAINER_LOG_EPOCH": "12345"}\n{"CONTAINER_LOG_EPOCH": "12345"}\n'
)
journald_logs.return_value.__aenter__.return_value = mock_response
resp = await api_client.get(f"{path_prefix}/logs/latest")
assert resp.status == 200
assert journald_logs.call_count == 2
# Check the first call for getting epoch
epoch_call = journald_logs.call_args_list[0]
assert epoch_call[1]["params"] == {"CONTAINER_NAME": syslog_identifier}
assert epoch_call[1]["range_header"] == "entries=:-1:2"
# Check the second call for getting logs with the epoch
logs_call = journald_logs.call_args_list[1]
assert logs_call[1]["params"]["SYSLOG_IDENTIFIER"] == syslog_identifier
assert logs_call[1]["params"]["CONTAINER_LOG_EPOCH"] == "12345"
assert logs_call[1]["range_header"] == "entries=:0:18446744073709551615"
journal_logs_reader.assert_called_with(ANY, LogFormatter.PLAIN, True)
journald_logs.reset_mock()
journal_logs_reader.reset_mock()
resp = await api_client.get(f"{path_prefix}/logs/boots/0")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": syslog_identifier, "_BOOT_ID": "ccc"},
range_header=DEFAULT_LOG_RANGE,
accept=LogFormat.JOURNAL,
)
journald_logs.reset_mock()
resp = await api_client.get(f"{path_prefix}/logs/boots/0/follow")
assert resp.status == 200
assert resp.content_type == "text/plain"
journald_logs.assert_called_once_with(
params={
"SYSLOG_IDENTIFIER": syslog_identifier,
"_BOOT_ID": "ccc",
"follow": "",
},
range_header=DEFAULT_LOG_RANGE_FOLLOW,
accept=LogFormat.JOURNAL,
)
@pytest.fixture
async def advanced_logs_tester(
api_client: TestClient,
journald_logs: MagicMock,
coresys: CoreSys,
os_available,
journal_logs_reader: MagicMock,
) -> Callable[[str, str], Awaitable[None]]:
"""Fixture that returns a function to test advanced logs endpoints.
This allows tests to avoid explicitly passing all the required fixtures.
Usage:
async def test_my_logs(advanced_logs_tester):
await advanced_logs_tester("/path/prefix", "syslog_identifier")
"""
async def test_logs(path_prefix: str, syslog_identifier: str):
await _common_test_api_advanced_logs(
path_prefix,
syslog_identifier,
api_client,
journald_logs,
coresys,
os_available,
journal_logs_reader,
)
return test_logs

View File

@@ -20,6 +20,7 @@ from supervisor.exceptions import HassioError
from supervisor.store.repository import Repository
from ..const import TEST_ADDON_SLUG
from . import common_test_api_advanced_logs
def _create_test_event(name: str, state: ContainerState) -> DockerContainerStateEvent:
@@ -71,11 +72,21 @@ async def test_addons_info_not_installed(
async def test_api_addon_logs(
advanced_logs_tester,
api_client: TestClient,
journald_logs: MagicMock,
coresys: CoreSys,
os_available,
install_addon_ssh: Addon,
):
"""Test addon logs."""
await advanced_logs_tester("/addons/local_ssh", "addon_local_ssh")
await common_test_api_advanced_logs(
"/addons/local_ssh",
"addon_local_ssh",
api_client,
journald_logs,
coresys,
os_available,
)
async def test_api_addon_logs_not_installed(api_client: TestClient):

View File

@@ -1,6 +1,18 @@
"""Test audio api."""
from unittest.mock import MagicMock
async def test_api_audio_logs(advanced_logs_tester) -> None:
from aiohttp.test_utils import TestClient
from supervisor.coresys import CoreSys
from tests.api import common_test_api_advanced_logs
async def test_api_audio_logs(
api_client: TestClient, journald_logs: MagicMock, coresys: CoreSys, os_available
):
"""Test audio logs."""
await advanced_logs_tester("/audio", "hassio_audio")
await common_test_api_advanced_logs(
"/audio", "hassio_audio", api_client, journald_logs, coresys, os_available
)

View File

@@ -1,12 +1,13 @@
"""Test DNS API."""
from unittest.mock import patch
from unittest.mock import MagicMock, patch
from aiohttp.test_utils import TestClient
from supervisor.coresys import CoreSys
from supervisor.dbus.resolved import Resolved
from tests.api import common_test_api_advanced_logs
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.resolved import Resolved as ResolvedService
@@ -65,6 +66,15 @@ async def test_options(api_client: TestClient, coresys: CoreSys):
restart.assert_called_once()
async def test_api_dns_logs(advanced_logs_tester):
async def test_api_dns_logs(
api_client: TestClient, journald_logs: MagicMock, coresys: CoreSys, os_available
):
"""Test dns logs."""
await advanced_logs_tester("/dns", "hassio_dns")
await common_test_api_advanced_logs(
"/dns",
"hassio_dns",
api_client,
journald_logs,
coresys,
os_available,
)

View File

@@ -17,19 +17,29 @@ from supervisor.homeassistant.api import APIState, HomeAssistantAPI
from supervisor.homeassistant.const import WSEvent
from supervisor.homeassistant.core import HomeAssistantCore
from supervisor.homeassistant.module import HomeAssistant
from supervisor.resolution.const import ContextType, IssueType
from supervisor.resolution.data import Issue
from tests.api import common_test_api_advanced_logs
from tests.common import AsyncIterator, load_json_fixture
@pytest.mark.parametrize("legacy_route", [True, False])
async def test_api_core_logs(
advanced_logs_tester: AsyncMock,
api_client: TestClient,
journald_logs: MagicMock,
coresys: CoreSys,
os_available,
legacy_route: bool,
):
"""Test core logs."""
await advanced_logs_tester(
await common_test_api_advanced_logs(
f"/{'homeassistant' if legacy_route else 'core'}",
"homeassistant",
api_client,
journald_logs,
coresys,
os_available,
)
@@ -359,3 +369,73 @@ async def test_api_progress_updates_home_assistant_update(
"done": True,
},
]
async def test_update_frontend_check_success(api_client: TestClient, coresys: CoreSys):
"""Test that update succeeds when frontend check passes."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.version = AwesomeVersion("2025.8.0")
with (
patch.object(
DockerHomeAssistant,
"version",
new=PropertyMock(return_value=AwesomeVersion("2025.8.0")),
),
patch.object(
HomeAssistantAPI, "get_config", return_value={"components": ["frontend"]}
),
patch.object(HomeAssistantAPI, "check_frontend_available", return_value=True),
):
resp = await api_client.post("/core/update", json={"version": "2025.8.3"})
assert resp.status == 200
async def test_update_frontend_check_fails_triggers_rollback(
api_client: TestClient,
coresys: CoreSys,
caplog: pytest.LogCaptureFixture,
tmp_supervisor_data: Path,
):
"""Test that update triggers rollback when frontend check fails."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.version = AwesomeVersion("2025.8.0")
# Mock successful first update, failed frontend check, then successful rollback
update_call_count = 0
async def mock_update(*args, **kwargs):
nonlocal update_call_count
update_call_count += 1
if update_call_count == 1:
# First update succeeds
coresys.homeassistant.version = AwesomeVersion("2025.8.3")
elif update_call_count == 2:
# Rollback succeeds
coresys.homeassistant.version = AwesomeVersion("2025.8.0")
with (
patch.object(DockerInterface, "update", new=mock_update),
patch.object(
DockerHomeAssistant,
"version",
new=PropertyMock(return_value=AwesomeVersion("2025.8.0")),
),
patch.object(
HomeAssistantAPI, "get_config", return_value={"components": ["frontend"]}
),
patch.object(HomeAssistantAPI, "check_frontend_available", return_value=False),
):
resp = await api_client.post("/core/update", json={"version": "2025.8.3"})
# Update should trigger rollback, which succeeds and returns 200
assert resp.status == 200
assert "Frontend component loaded but frontend is not accessible" in caplog.text
assert "HomeAssistant update failed -> rollback!" in caplog.text
# Should have called update twice (once for update, once for rollback)
assert update_call_count == 2
# An update_rollback issue should be created
assert (
Issue(IssueType.UPDATE_ROLLBACK, ContextType.CORE) in coresys.resolution.issues
)

View File

@@ -272,7 +272,7 @@ async def test_advaced_logs_query_parameters(
range_header=DEFAULT_RANGE,
accept=LogFormat.JOURNAL,
)
journal_logs_reader.assert_called_with(ANY, LogFormatter.VERBOSE, False)
journal_logs_reader.assert_called_with(ANY, LogFormatter.VERBOSE)
journal_logs_reader.reset_mock()
journald_logs.reset_mock()
@@ -290,7 +290,7 @@ async def test_advaced_logs_query_parameters(
range_header="entries=:-52:53",
accept=LogFormat.JOURNAL,
)
journal_logs_reader.assert_called_with(ANY, LogFormatter.VERBOSE, False)
journal_logs_reader.assert_called_with(ANY, LogFormatter.VERBOSE)
async def test_advanced_logs_boot_id_offset(
@@ -343,24 +343,24 @@ async def test_advanced_logs_formatters(
"""Test advanced logs formatters varying on Accept header."""
await api_client.get("/host/logs")
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE, False)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE)
journal_logs_reader.reset_mock()
headers = {"Accept": "text/x-log"}
await api_client.get("/host/logs", headers=headers)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE, False)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE)
journal_logs_reader.reset_mock()
await api_client.get("/host/logs/identifiers/test")
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.PLAIN, False)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.PLAIN)
journal_logs_reader.reset_mock()
headers = {"Accept": "text/x-log"}
await api_client.get("/host/logs/identifiers/test", headers=headers)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE, False)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE)
async def test_advanced_logs_errors(coresys: CoreSys, api_client: TestClient):

View File

@@ -1,6 +1,23 @@
"""Test multicast api."""
from unittest.mock import MagicMock
async def test_api_multicast_logs(advanced_logs_tester):
from aiohttp.test_utils import TestClient
from supervisor.coresys import CoreSys
from tests.api import common_test_api_advanced_logs
async def test_api_multicast_logs(
api_client: TestClient, journald_logs: MagicMock, coresys: CoreSys, os_available
):
"""Test multicast logs."""
await advanced_logs_tester("/multicast", "hassio_multicast")
await common_test_api_advanced_logs(
"/multicast",
"hassio_multicast",
api_client,
journald_logs,
coresys,
os_available,
)

View File

@@ -18,6 +18,7 @@ from supervisor.store.repository import Repository
from supervisor.supervisor import Supervisor
from supervisor.updater import Updater
from tests.api import common_test_api_advanced_logs
from tests.common import AsyncIterator, load_json_fixture
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.os_agent import OSAgent as OSAgentService
@@ -154,9 +155,18 @@ async def test_api_supervisor_options_diagnostics(
assert coresys.dbus.agent.diagnostics is False
async def test_api_supervisor_logs(advanced_logs_tester):
async def test_api_supervisor_logs(
api_client: TestClient, journald_logs: MagicMock, coresys: CoreSys, os_available
):
"""Test supervisor logs."""
await advanced_logs_tester("/supervisor", "hassio_supervisor")
await common_test_api_advanced_logs(
"/supervisor",
"hassio_supervisor",
api_client,
journald_logs,
coresys,
os_available,
)
async def test_api_supervisor_fallback(

View File

@@ -445,23 +445,28 @@ async def test_install_progress_rounding_does_not_cause_misses(
]
coresys.docker.images.pull.return_value = AsyncIterator(logs)
# Schedule job so we can listen for the end. Then we can assert against the WS mock
event = asyncio.Event()
job, install_task = coresys.jobs.schedule_job(
test_docker_interface.install,
JobSchedulerOptions(),
AwesomeVersion("1.2.3"),
"test",
)
with (
patch.object(
type(coresys.supervisor), "arch", PropertyMock(return_value="i386")
),
):
# Schedule job so we can listen for the end. Then we can assert against the WS mock
event = asyncio.Event()
job, install_task = coresys.jobs.schedule_job(
test_docker_interface.install,
JobSchedulerOptions(),
AwesomeVersion("1.2.3"),
"test",
)
async def listen_for_job_end(reference: SupervisorJob):
if reference.uuid != job.uuid:
return
event.set()
async def listen_for_job_end(reference: SupervisorJob):
if reference.uuid != job.uuid:
return
event.set()
coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_END, listen_for_job_end)
await install_task
await event.wait()
coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_END, listen_for_job_end)
await install_task
await event.wait()
capture_exception.assert_not_called()
@@ -659,64 +664,3 @@ async def test_install_progress_handles_layers_skipping_download(
assert job.done is True
assert job.progress == 100
capture_exception.assert_not_called()
async def test_missing_total_handled_gracefully(
coresys: CoreSys,
test_docker_interface: DockerInterface,
ha_ws_client: AsyncMock,
capture_exception: Mock,
):
"""Test missing 'total' fields in progress details handled gracefully."""
coresys.core.set_state(CoreState.RUNNING)
# Progress details with missing 'total' fields observed in real-world pulls
logs = [
{
"status": "Pulling from home-assistant/odroid-n2-homeassistant",
"id": "2025.7.1",
},
{"status": "Pulling fs layer", "progressDetail": {}, "id": "1e214cd6d7d0"},
{
"status": "Downloading",
"progressDetail": {"current": 436480882},
"progress": "[===================================================] 436.5MB/436.5MB",
"id": "1e214cd6d7d0",
},
{"status": "Verifying Checksum", "progressDetail": {}, "id": "1e214cd6d7d0"},
{"status": "Download complete", "progressDetail": {}, "id": "1e214cd6d7d0"},
{
"status": "Extracting",
"progressDetail": {"current": 436480882},
"progress": "[===================================================] 436.5MB/436.5MB",
"id": "1e214cd6d7d0",
},
{"status": "Pull complete", "progressDetail": {}, "id": "1e214cd6d7d0"},
{
"status": "Digest: sha256:7d97da645f232f82a768d0a537e452536719d56d484d419836e53dbe3e4ec736"
},
{
"status": "Status: Downloaded newer image for ghcr.io/home-assistant/odroid-n2-homeassistant:2025.7.1"
},
]
coresys.docker.images.pull.return_value = AsyncIterator(logs)
# Schedule job so we can listen for the end. Then we can assert against the WS mock
event = asyncio.Event()
job, install_task = coresys.jobs.schedule_job(
test_docker_interface.install,
JobSchedulerOptions(),
AwesomeVersion("1.2.3"),
"test",
)
async def listen_for_job_end(reference: SupervisorJob):
if reference.uuid != job.uuid:
return
event.set()
coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_END, listen_for_job_end)
await install_task
await event.wait()
capture_exception.assert_not_called()

View File

@@ -0,0 +1,110 @@
"""Test Home Assistant API."""
from contextlib import asynccontextmanager
from unittest.mock import MagicMock, patch
from aiohttp import hdrs
from awesomeversion import AwesomeVersion
import pytest
from supervisor.coresys import CoreSys
from supervisor.exceptions import HomeAssistantAPIError
from supervisor.homeassistant.const import LANDINGPAGE
async def test_check_frontend_available_success(coresys: CoreSys):
"""Test frontend availability check succeeds with valid HTML response."""
coresys.homeassistant.version = AwesomeVersion("2025.8.0")
mock_response = MagicMock()
mock_response.status = 200
mock_response.headers = {hdrs.CONTENT_TYPE: "text/html; charset=utf-8"}
@asynccontextmanager
async def mock_make_request(*args, **kwargs):
yield mock_response
with patch.object(
type(coresys.homeassistant.api), "make_request", new=mock_make_request
):
result = await coresys.homeassistant.api.check_frontend_available()
assert result is True
async def test_check_frontend_available_wrong_status(coresys: CoreSys):
"""Test frontend availability check fails with non-200 status."""
coresys.homeassistant.version = AwesomeVersion("2025.8.0")
mock_response = MagicMock()
mock_response.status = 404
mock_response.headers = {hdrs.CONTENT_TYPE: "text/html"}
@asynccontextmanager
async def mock_make_request(*args, **kwargs):
yield mock_response
with patch.object(
type(coresys.homeassistant.api), "make_request", new=mock_make_request
):
result = await coresys.homeassistant.api.check_frontend_available()
assert result is False
async def test_check_frontend_available_wrong_content_type(
coresys: CoreSys, caplog: pytest.LogCaptureFixture
):
"""Test frontend availability check fails with wrong content type."""
coresys.homeassistant.version = AwesomeVersion("2025.8.0")
mock_response = MagicMock()
mock_response.status = 200
mock_response.headers = {hdrs.CONTENT_TYPE: "application/json"}
@asynccontextmanager
async def mock_make_request(*args, **kwargs):
yield mock_response
with patch.object(
type(coresys.homeassistant.api), "make_request", new=mock_make_request
):
result = await coresys.homeassistant.api.check_frontend_available()
assert result is False
assert "unexpected content type" in caplog.text
async def test_check_frontend_available_api_error(coresys: CoreSys):
"""Test frontend availability check handles API errors gracefully."""
coresys.homeassistant.version = AwesomeVersion("2025.8.0")
@asynccontextmanager
async def mock_make_request(*args, **kwargs):
raise HomeAssistantAPIError("Connection failed")
yield # pragma: no cover
with patch.object(
type(coresys.homeassistant.api), "make_request", new=mock_make_request
):
result = await coresys.homeassistant.api.check_frontend_available()
assert result is False
async def test_check_frontend_available_landingpage(coresys: CoreSys):
"""Test frontend availability check returns False for landingpage."""
coresys.homeassistant.version = LANDINGPAGE
result = await coresys.homeassistant.api.check_frontend_available()
assert result is False
async def test_check_frontend_available_no_version(coresys: CoreSys):
"""Test frontend availability check returns False when no version set."""
coresys.homeassistant.version = None
result = await coresys.homeassistant.api.check_frontend_available()
assert result is False

View File

@@ -90,49 +90,6 @@ async def test_logs_coloured(journald_gateway: MagicMock, coresys: CoreSys):
)
async def test_logs_no_colors(journald_gateway: MagicMock, coresys: CoreSys):
"""Test ANSI color codes being stripped when no_colors=True."""
journald_gateway.content.feed_data(
load_fixture("logs_export_supervisor.txt").encode("utf-8")
)
journald_gateway.content.feed_eof()
async with coresys.host.logs.journald_logs() as resp:
cursor, line = await anext(journal_logs_reader(resp, no_colors=True))
assert (
cursor
== "s=83fee99ca0c3466db5fc120d52ca7dd8;i=2049389;b=f5a5c442fa6548cf97474d2d57c920b3;m=4263828e8c;t=612dda478b01b;x=9ae12394c9326930"
)
# Colors should be stripped
assert (
line == "24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor"
)
async def test_logs_verbose_no_colors(journald_gateway: MagicMock, coresys: CoreSys):
"""Test ANSI color codes being stripped from verbose formatted logs when no_colors=True."""
journald_gateway.content.feed_data(
load_fixture("logs_export_supervisor.txt").encode("utf-8")
)
journald_gateway.content.feed_eof()
async with coresys.host.logs.journald_logs() as resp:
cursor, line = await anext(
journal_logs_reader(
resp, log_formatter=LogFormatter.VERBOSE, no_colors=True
)
)
assert (
cursor
== "s=83fee99ca0c3466db5fc120d52ca7dd8;i=2049389;b=f5a5c442fa6548cf97474d2d57c920b3;m=4263828e8c;t=612dda478b01b;x=9ae12394c9326930"
)
# Colors should be stripped in verbose format too
assert (
line
== "2024-03-04 22:56:56.709 ha-hloub hassio_supervisor[466]: 24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor"
)
async def test_boot_ids(
journald_gateway: MagicMock,
coresys: CoreSys,

View File

@@ -1179,6 +1179,7 @@ async def test_job_scheduled_delay(coresys: CoreSys):
async def test_job_scheduled_at(coresys: CoreSys):
"""Test job that schedules a job to start at a specified time."""
dt = datetime.now()
class TestClass:
"""Test class."""
@@ -1188,12 +1189,10 @@ async def test_job_scheduled_at(coresys: CoreSys):
self.coresys = coresys
@Job(name="test_job_scheduled_at_job_scheduler")
async def job_scheduler(
self, scheduled_time: datetime
) -> tuple[SupervisorJob, asyncio.TimerHandle]:
async def job_scheduler(self) -> tuple[SupervisorJob, asyncio.TimerHandle]:
"""Schedule a job to run at specified time."""
return self.coresys.jobs.schedule_job(
self.job_task, JobSchedulerOptions(start_at=scheduled_time)
self.job_task, JobSchedulerOptions(start_at=dt + timedelta(seconds=0.1))
)
@Job(name="test_job_scheduled_at_job_task")
@@ -1202,28 +1201,29 @@ async def test_job_scheduled_at(coresys: CoreSys):
self.coresys.jobs.current.stage = "work"
test = TestClass(coresys)
# Schedule job to run 0.1 seconds from now
scheduled_time = datetime.now() + timedelta(seconds=0.1)
job, _ = await test.job_scheduler(scheduled_time)
started = False
ended = False
job_started = asyncio.Event()
job_ended = asyncio.Event()
async def start_listener(evt_job: SupervisorJob):
nonlocal started
started = started or evt_job.uuid == job.uuid
if evt_job.uuid == job.uuid:
job_started.set()
async def end_listener(evt_job: SupervisorJob):
nonlocal ended
ended = ended or evt_job.uuid == job.uuid
if evt_job.uuid == job.uuid:
job_ended.set()
coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_START, start_listener)
coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_END, end_listener)
async with time_machine.travel(dt):
job, _ = await test.job_scheduler()
await asyncio.sleep(0.2)
coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_START, start_listener)
coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_END, end_listener)
# Advance time to exactly when job should start and wait for completion
async with time_machine.travel(dt + timedelta(seconds=0.1)):
await asyncio.wait_for(
asyncio.gather(job_started.wait(), job_ended.wait()), timeout=1.0
)
assert started
assert ended
assert job.done
assert job.name == "test_job_scheduled_at_job_task"
assert job.stage == "work"

View File

@@ -86,22 +86,6 @@ def test_format_verbose_newlines():
)
def test_format_verbose_colors():
"""Test verbose formatter with ANSI colors in message."""
fields = {
"__REALTIME_TIMESTAMP": "1379403171000000",
"_HOSTNAME": "homeassistant",
"SYSLOG_IDENTIFIER": "python",
"_PID": "666",
"MESSAGE": "\x1b[32mHello, world!\x1b[0m",
}
assert (
journal_verbose_formatter(fields)
== "2013-09-17 07:32:51.000 homeassistant python[666]: \x1b[32mHello, world!\x1b[0m"
)
async def test_parsing_simple():
"""Test plain formatter."""
journal_logs, stream = _journal_logs_mock()
@@ -313,54 +297,3 @@ async def test_parsing_non_utf8_in_binary_message():
)
_, line = await anext(journal_logs_reader(journal_logs))
assert line == "Hello, \ufffd world!"
def test_format_plain_no_colors():
"""Test plain formatter strips ANSI color codes when no_colors=True."""
fields = {"MESSAGE": "\x1b[32mHello, world!\x1b[0m"}
assert journal_plain_formatter(fields, no_colors=True) == "Hello, world!"
def test_format_verbose_no_colors():
"""Test verbose formatter strips ANSI color codes when no_colors=True."""
fields = {
"__REALTIME_TIMESTAMP": "1379403171000000",
"_HOSTNAME": "homeassistant",
"SYSLOG_IDENTIFIER": "python",
"_PID": "666",
"MESSAGE": "\x1b[32mHello, world!\x1b[0m",
}
assert (
journal_verbose_formatter(fields, no_colors=True)
== "2013-09-17 07:32:51.000 homeassistant python[666]: Hello, world!"
)
async def test_parsing_colored_logs_verbose_no_colors():
"""Test verbose formatter strips colors from colored logs."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(
b"__REALTIME_TIMESTAMP=1379403171000000\n"
b"_HOSTNAME=homeassistant\n"
b"SYSLOG_IDENTIFIER=python\n"
b"_PID=666\n"
b"MESSAGE\n\x0e\x00\x00\x00\x00\x00\x00\x00\x1b[31mERROR\x1b[0m\n"
b"AFTER=after\n\n"
)
_, line = await anext(
journal_logs_reader(
journal_logs, log_formatter=LogFormatter.VERBOSE, no_colors=True
)
)
assert line == "2013-09-17 07:32:51.000 homeassistant python[666]: ERROR"
async def test_parsing_multiple_color_codes():
"""Test stripping multiple ANSI color codes in single message."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(
b"MESSAGE\n\x29\x00\x00\x00\x00\x00\x00\x00\x1b[31mRed\x1b[0m \x1b[32mGreen\x1b[0m \x1b[34mBlue\x1b[0m\n"
b"AFTER=after\n\n"
)
_, line = await anext(journal_logs_reader(journal_logs, no_colors=True))
assert line == "Red Green Blue"