Use Journal Export Format for host (advanced) logs (#4963)

* Use Journal Export Format for host (advanced) logs

Add methods for handling Journal Export Format and use it for fetching
of host logs. This is foundation for colored streaming logs for other
endpoints as well.

* Make pylint happier - remove extra pass statement

* Rewrite journal gateway tests to mock ClientResponse's StreamReader

* Handle connection refused error when connecting to journal-gatewayd

* Use SYSTEMD_JOURNAL_GATEWAYD_SOCKET global path also for connection

* Use parsing algorithm suggested by @agners in review

* Fix timestamps in formatting, always use UTC for now

* Add tests for Accept header in host logs

* Apply suggestions from @agners

Co-authored-by: Stefan Agner <stefan@agner.ch>

* Bail out of parsing earlier if field is not in required fields

* Fix parsing issue discovered in the wild and add test case

* Make verbose formatter more tolerant

* Use some bytes' native functions for some minor optimizations

* Move MalformedBinaryEntryError to exceptions module, add test for it

---------

Co-authored-by: Stefan Agner <stefan@agner.ch>
This commit is contained in:
Jan Čermák 2024-03-20 09:00:45 +01:00 committed by GitHub
parent 0e0fadd72d
commit 0814552b2a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 519 additions and 41 deletions

View File

@ -8,6 +8,7 @@ CONTENT_TYPE_PNG = "image/png"
CONTENT_TYPE_TAR = "application/tar" CONTENT_TYPE_TAR = "application/tar"
CONTENT_TYPE_TEXT = "text/plain" CONTENT_TYPE_TEXT = "text/plain"
CONTENT_TYPE_URL = "application/x-www-form-urlencoded" CONTENT_TYPE_URL = "application/x-www-form-urlencoded"
CONTENT_TYPE_X_LOG = "text/x-log"
COOKIE_INGRESS = "ingress_session" COOKIE_INGRESS = "ingress_session"

View File

@ -28,7 +28,14 @@ from ..const import (
) )
from ..coresys import CoreSysAttributes from ..coresys import CoreSysAttributes
from ..exceptions import APIError, HostLogError from ..exceptions import APIError, HostLogError
from ..host.const import PARAM_BOOT_ID, PARAM_FOLLOW, PARAM_SYSLOG_IDENTIFIER from ..host.const import (
PARAM_BOOT_ID,
PARAM_FOLLOW,
PARAM_SYSLOG_IDENTIFIER,
LogFormat,
LogFormatter,
)
from ..utils.systemd_journal import journal_logs_reader
from .const import ( from .const import (
ATTR_AGENT_VERSION, ATTR_AGENT_VERSION,
ATTR_APPARMOR_VERSION, ATTR_APPARMOR_VERSION,
@ -43,6 +50,7 @@ from .const import (
ATTR_STARTUP_TIME, ATTR_STARTUP_TIME,
ATTR_USE_NTP, ATTR_USE_NTP,
CONTENT_TYPE_TEXT, CONTENT_TYPE_TEXT,
CONTENT_TYPE_X_LOG,
) )
from .utils import api_process, api_validate from .utils import api_process, api_validate
@ -158,6 +166,7 @@ class APIHost(CoreSysAttributes):
self, request: web.Request, identifier: str | None = None, follow: bool = False self, request: web.Request, identifier: str | None = None, follow: bool = False
) -> web.StreamResponse: ) -> web.StreamResponse:
"""Return systemd-journald logs.""" """Return systemd-journald logs."""
log_formatter = LogFormatter.PLAIN
params = {} params = {}
if identifier: if identifier:
params[PARAM_SYSLOG_IDENTIFIER] = identifier params[PARAM_SYSLOG_IDENTIFIER] = identifier
@ -165,6 +174,8 @@ class APIHost(CoreSysAttributes):
params[PARAM_SYSLOG_IDENTIFIER] = request.match_info.get(IDENTIFIER) params[PARAM_SYSLOG_IDENTIFIER] = request.match_info.get(IDENTIFIER)
else: else:
params[PARAM_SYSLOG_IDENTIFIER] = self.sys_host.logs.default_identifiers params[PARAM_SYSLOG_IDENTIFIER] = self.sys_host.logs.default_identifiers
# host logs should be always verbose, no matter what Accept header is used
log_formatter = LogFormatter.VERBOSE
if BOOTID in request.match_info: if BOOTID in request.match_info:
params[PARAM_BOOT_ID] = await self._get_boot_id( params[PARAM_BOOT_ID] = await self._get_boot_id(
@ -175,26 +186,31 @@ class APIHost(CoreSysAttributes):
if ACCEPT in request.headers and request.headers[ACCEPT] not in [ if ACCEPT in request.headers and request.headers[ACCEPT] not in [
CONTENT_TYPE_TEXT, CONTENT_TYPE_TEXT,
CONTENT_TYPE_X_LOG,
"*/*", "*/*",
]: ]:
raise APIError( raise APIError(
"Invalid content type requested. Only text/plain supported for now." "Invalid content type requested. Only text/plain and text/x-log "
"supported for now."
) )
if request.headers[ACCEPT] == CONTENT_TYPE_X_LOG:
log_formatter = LogFormatter.VERBOSE
if RANGE in request.headers: if RANGE in request.headers:
range_header = request.headers.get(RANGE) range_header = request.headers.get(RANGE)
else: else:
range_header = f"entries=:-{DEFAULT_RANGE}:" range_header = f"entries=:-{DEFAULT_RANGE}:"
async with self.sys_host.logs.journald_logs( async with self.sys_host.logs.journald_logs(
params=params, range_header=range_header params=params, range_header=range_header, accept=LogFormat.JOURNAL
) as resp: ) as resp:
try: try:
response = web.StreamResponse() response = web.StreamResponse()
response.content_type = CONTENT_TYPE_TEXT response.content_type = CONTENT_TYPE_TEXT
await response.prepare(request) await response.prepare(request)
async for data in resp.content: async for line in journal_logs_reader(resp, log_formatter):
await response.write(data) await response.write(line.encode("utf-8") + b"\n")
except ConnectionResetError as ex: except ConnectionResetError as ex:
raise APIError( raise APIError(
"Connection reset when trying to fetch data from systemd-journald." "Connection reset when trying to fetch data from systemd-journald."

View File

@ -509,6 +509,17 @@ class WhoamiConnectivityError(WhoamiError):
"""Connectivity errors while using whoami.""" """Connectivity errors while using whoami."""
# utils/systemd_journal
class SystemdJournalError(HassioError):
"""Error while processing systemd journal logs."""
class MalformedBinaryEntryError(SystemdJournalError):
"""Raised when binary entry in the journal isn't followed by a newline."""
# docker/api # docker/api

View File

@ -62,3 +62,10 @@ class LogFormat(StrEnum):
JOURNAL = "application/vnd.fdo.journal" JOURNAL = "application/vnd.fdo.journal"
JSON = "application/json" JSON = "application/json"
TEXT = "text/plain" TEXT = "text/plain"
class LogFormatter(StrEnum):
"""Log formatter."""
PLAIN = "plain"
VERBOSE = "verbose"

View File

@ -7,12 +7,18 @@ import logging
from pathlib import Path from pathlib import Path
from aiohttp import ClientError, ClientSession, ClientTimeout from aiohttp import ClientError, ClientSession, ClientTimeout
from aiohttp.client_exceptions import UnixClientConnectorError
from aiohttp.client_reqrep import ClientResponse from aiohttp.client_reqrep import ClientResponse
from aiohttp.connector import UnixConnector from aiohttp.connector import UnixConnector
from aiohttp.hdrs import ACCEPT, RANGE from aiohttp.hdrs import ACCEPT, RANGE
from ..coresys import CoreSys, CoreSysAttributes from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import ConfigurationFileError, HostLogError, HostNotSupportedError from ..exceptions import (
ConfigurationFileError,
HostLogError,
HostNotSupportedError,
HostServiceError,
)
from ..utils.json import read_json_file from ..utils.json import read_json_file
from .const import PARAM_BOOT_ID, PARAM_SYSLOG_IDENTIFIER, LogFormat from .const import PARAM_BOOT_ID, PARAM_SYSLOG_IDENTIFIER, LogFormat
@ -138,16 +144,21 @@ class LogsControl(CoreSysAttributes):
"No systemd-journal-gatewayd Unix socket available", _LOGGER.error "No systemd-journal-gatewayd Unix socket available", _LOGGER.error
) )
async with ClientSession( try:
connector=UnixConnector(path="/run/systemd-journal-gatewayd.sock") async with ClientSession(
) as session: connector=UnixConnector(path=str(SYSTEMD_JOURNAL_GATEWAYD_SOCKET))
headers = {ACCEPT: accept} ) as session:
if range_header: headers = {ACCEPT: accept}
headers[RANGE] = range_header if range_header:
async with session.get( headers[RANGE] = range_header
f"http://localhost{path}", async with session.get(
headers=headers, f"http://localhost{path}",
params=params or {}, headers=headers,
timeout=timeout, params=params or {},
) as client_response: timeout=timeout,
yield client_response ) as client_response:
yield client_response
except UnixClientConnectorError as ex:
raise HostServiceError(
"Unable to connect to systemd-journal-gatewayd", _LOGGER.error
) from ex

View File

@ -0,0 +1,113 @@
"""Utilities for working with systemd journal export format."""
from collections.abc import AsyncGenerator
from datetime import UTC, datetime
from functools import wraps
from aiohttp import ClientResponse
from supervisor.exceptions import MalformedBinaryEntryError
from supervisor.host.const import LogFormatter
def formatter(required_fields: list[str]):
"""Decorate journal entry formatters with list of required fields.
Helper decorator that can be used for getting list of required fields for a journal
formatter function using function.required_fields function attribute.
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper.required_fields = required_fields
return wrapper
return decorator
@formatter(["MESSAGE"])
def journal_plain_formatter(entries: dict[str, str]) -> str:
"""Format parsed journal entries as a plain message."""
return entries["MESSAGE"]
@formatter(
[
"__REALTIME_TIMESTAMP",
"_HOSTNAME",
"SYSLOG_IDENTIFIER",
"_PID",
"MESSAGE",
]
)
def journal_verbose_formatter(entries: dict[str, str]) -> str:
"""Format parsed journal entries to a journalctl-like format."""
ts = datetime.fromtimestamp(
int(entries["__REALTIME_TIMESTAMP"]) / 1e6, UTC
).isoformat(sep=" ", timespec="milliseconds")
ts = ts[: ts.index(".") + 4] # strip TZ offset
identifier = (
f"{entries.get("SYSLOG_IDENTIFIER", "_UNKNOWN_")}[{entries["_PID"]}]"
if "_PID" in entries
else entries.get("SYSLOG_IDENTIFIER", "_UNKNOWN_")
)
return f"{ts} {entries.get("_HOSTNAME", "")} {identifier}: {entries.get("MESSAGE", "")}"
async def journal_logs_reader(
journal_logs: ClientResponse,
log_formatter: LogFormatter = LogFormatter.PLAIN,
) -> AsyncGenerator[str, None]:
"""Read logs from systemd journal line by line, formatted using the given formatter."""
match log_formatter:
case LogFormatter.PLAIN:
formatter_ = journal_plain_formatter
case LogFormatter.VERBOSE:
formatter_ = journal_verbose_formatter
case _:
raise ValueError(f"Unknown log format: {log_formatter}")
async with journal_logs as resp:
entries: dict[str, str] = {}
while not resp.content.at_eof():
line = await resp.content.readuntil(b"\n")
# newline means end of message:
if line == b"\n":
yield formatter_(entries)
entries = {}
continue
# Journal fields consisting only of valid non-control UTF-8 codepoints
# are serialized as they are (i.e. the field name, followed by '=',
# followed by field data), followed by a newline as separator to the next
# field. Note that fields containing newlines cannot be formatted like
# this. Non-control UTF-8 codepoints are the codepoints with value at or
# above 32 (' '), or equal to 9 (TAB).
name, sep, data = line.partition(b"=")
if not sep:
# Other journal fields are serialized in a special binary safe way:
# field name, followed by newline
name = name[:-1] # strip \n
# followed by a binary 64-bit little endian size value,
length_raw = await resp.content.readexactly(8)
length = int.from_bytes(length_raw, byteorder="little")
# followed by the binary field data,
data = await resp.content.readexactly(length + 1)
# followed by a newline as separator to the next field.
if not data.endswith(b"\n"):
raise MalformedBinaryEntryError(
f"Failed parsing binary entry {data}"
)
name = name.decode("utf-8")
if name not in formatter_.required_fields:
# we must read to the end of the entry in the stream, so we can
# only continue the loop here
continue
# strip \n for simple fields before decoding
entries[name] = data[:-1].decode("utf-8")

View File

@ -1,12 +1,13 @@
"""Test Host API.""" """Test Host API."""
from unittest.mock import MagicMock from unittest.mock import ANY, MagicMock
from aiohttp.test_utils import TestClient from aiohttp.test_utils import TestClient
import pytest import pytest
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
from supervisor.dbus.resolved import Resolved from supervisor.dbus.resolved import Resolved
from supervisor.host.const import LogFormat, LogFormatter
DEFAULT_RANGE = "entries=:-100:" DEFAULT_RANGE = "entries=:-100:"
# pylint: disable=protected-access # pylint: disable=protected-access
@ -154,6 +155,7 @@ async def test_advanced_logs(
journald_logs.assert_called_once_with( journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers}, params={"SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers},
range_header=DEFAULT_RANGE, range_header=DEFAULT_RANGE,
accept=LogFormat.JOURNAL,
) )
journald_logs.reset_mock() journald_logs.reset_mock()
@ -161,7 +163,9 @@ async def test_advanced_logs(
identifier = "dropbear" identifier = "dropbear"
await api_client.get(f"/host/logs/identifiers/{identifier}") await api_client.get(f"/host/logs/identifiers/{identifier}")
journald_logs.assert_called_once_with( journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": identifier}, range_header=DEFAULT_RANGE params={"SYSLOG_IDENTIFIER": identifier},
range_header=DEFAULT_RANGE,
accept=LogFormat.JOURNAL,
) )
journald_logs.reset_mock() journald_logs.reset_mock()
@ -174,6 +178,7 @@ async def test_advanced_logs(
"SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers, "SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers,
}, },
range_header=DEFAULT_RANGE, range_header=DEFAULT_RANGE,
accept=LogFormat.JOURNAL,
) )
journald_logs.reset_mock() journald_logs.reset_mock()
@ -182,6 +187,7 @@ async def test_advanced_logs(
journald_logs.assert_called_once_with( journald_logs.assert_called_once_with(
params={"_BOOT_ID": bootid, "SYSLOG_IDENTIFIER": identifier}, params={"_BOOT_ID": bootid, "SYSLOG_IDENTIFIER": identifier},
range_header=DEFAULT_RANGE, range_header=DEFAULT_RANGE,
accept=LogFormat.JOURNAL,
) )
journald_logs.reset_mock() journald_logs.reset_mock()
@ -191,6 +197,7 @@ async def test_advanced_logs(
journald_logs.assert_called_once_with( journald_logs.assert_called_once_with(
params={"SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers}, params={"SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers},
range_header=headers["Range"], range_header=headers["Range"],
accept=LogFormat.JOURNAL,
) )
journald_logs.reset_mock() journald_logs.reset_mock()
@ -202,6 +209,7 @@ async def test_advanced_logs(
"follow": "", "follow": "",
}, },
range_header=DEFAULT_RANGE, range_header=DEFAULT_RANGE,
accept=LogFormat.JOURNAL,
) )
@ -216,6 +224,7 @@ async def test_advanced_logs_boot_id_offset(
"SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers, "SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers,
}, },
range_header=DEFAULT_RANGE, range_header=DEFAULT_RANGE,
accept=LogFormat.JOURNAL,
) )
journald_logs.reset_mock() journald_logs.reset_mock()
@ -227,6 +236,7 @@ async def test_advanced_logs_boot_id_offset(
"SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers, "SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers,
}, },
range_header=DEFAULT_RANGE, range_header=DEFAULT_RANGE,
accept=LogFormat.JOURNAL,
) )
journald_logs.reset_mock() journald_logs.reset_mock()
@ -238,11 +248,41 @@ async def test_advanced_logs_boot_id_offset(
"SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers, "SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers,
}, },
range_header=DEFAULT_RANGE, range_header=DEFAULT_RANGE,
accept=LogFormat.JOURNAL,
) )
journald_logs.reset_mock() journald_logs.reset_mock()
async def test_advanced_logs_formatters(
api_client: TestClient,
coresys: CoreSys,
journald_gateway: MagicMock,
journal_logs_reader: MagicMock,
):
"""Test advanced logs formatters varying on Accept header."""
await api_client.get("/host/logs")
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE)
journal_logs_reader.reset_mock()
headers = {"Accept": "text/x-log"}
await api_client.get("/host/logs", headers=headers)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE)
journal_logs_reader.reset_mock()
await api_client.get("/host/logs/identifiers/test")
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.PLAIN)
journal_logs_reader.reset_mock()
headers = {"Accept": "text/x-log"}
await api_client.get("/host/logs/identifiers/test", headers=headers)
journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE)
async def test_advanced_logs_errors(api_client: TestClient): async def test_advanced_logs_errors(api_client: TestClient):
"""Test advanced logging API errors.""" """Test advanced logging API errors."""
# coresys = coresys_logs_control # coresys = coresys_logs_control
@ -257,5 +297,5 @@ async def test_advanced_logs_errors(api_client: TestClient):
assert result["result"] == "error" assert result["result"] == "error"
assert ( assert (
result["message"] result["message"]
== "Invalid content type requested. Only text/plain supported for now." == "Invalid content type requested. Only text/plain and text/x-log supported for now."
) )

View File

@ -1,4 +1,5 @@
"""Common test functions.""" """Common test functions."""
import asyncio
from functools import partial from functools import partial
from inspect import unwrap from inspect import unwrap
import os import os
@ -47,12 +48,7 @@ from supervisor.store.addon import AddonStore
from supervisor.store.repository import Repository from supervisor.store.repository import Repository
from supervisor.utils.dt import utcnow from supervisor.utils.dt import utcnow
from .common import ( from .common import load_binary_fixture, load_json_fixture, mock_dbus_services
load_binary_fixture,
load_fixture,
load_json_fixture,
mock_dbus_services,
)
from .const import TEST_ADDON_SLUG from .const import TEST_ADDON_SLUG
from .dbus_service_mocks.base import DBusServiceMock from .dbus_service_mocks.base import DBusServiceMock
from .dbus_service_mocks.network_connection_settings import ( from .dbus_service_mocks.network_connection_settings import (
@ -408,10 +404,28 @@ async def journald_gateway() -> MagicMock:
with patch("supervisor.host.logs.Path.is_socket", return_value=True), patch( with patch("supervisor.host.logs.Path.is_socket", return_value=True), patch(
"supervisor.host.logs.ClientSession.get" "supervisor.host.logs.ClientSession.get"
) as get: ) as get:
get.return_value.__aenter__.return_value.text = AsyncMock( reader = asyncio.StreamReader(loop=asyncio.get_running_loop())
return_value=load_fixture("logs_host.txt")
async def response_text():
return (await reader.read()).decode("utf-8")
client_response = MagicMock(
content=reader,
text=response_text,
) )
yield get
get.return_value.__aenter__.return_value = client_response
get.return_value.__aenter__.return_value.__aenter__.return_value = (
client_response
)
yield reader
@pytest.fixture
async def journal_logs_reader() -> MagicMock:
"""Mock journal_logs_reader in host API."""
with patch("supervisor.api.host.journal_logs_reader") as reader:
yield reader
@pytest.fixture @pytest.fixture

36
tests/fixtures/logs_export_host.txt vendored Normal file
View File

@ -0,0 +1,36 @@
__CURSOR=s=83fee99ca0c3466db5fc120d52ca7dd8;i=203f2ce;b=f5a5c442fa6548cf97474d2d57c920b3;m=3191a3c620;t=612ccd299e7af;x=8675b540119d10bb
__REALTIME_TIMESTAMP=1709520776193967
__MONOTONIC_TIMESTAMP=212896826912
__SEQNUM=33813198
__SEQNUM_ID=83fee99ca0c3466db5fc120d52ca7dd8
_BOOT_ID=f5a5c442fa6548cf97474d2d57c920b3
PRIORITY=6
_TRANSPORT=journal
_UID=0
_GID=0
_CAP_EFFECTIVE=1ffffffffff
_MACHINE_ID=edb710b8363b4ff48aed6e75a27f85a6
_HOSTNAME=homeassistant
_RUNTIME_SCOPE=system
SYSLOG_FACILITY=37
TID=1
SYSLOG_IDENTIFIER=systemd
_PID=1
_COMM=systemd
_EXE=/usr/lib/systemd/systemd
_CMDLINE=/sbin/init
_SYSTEMD_CGROUP=/init.scope
_SYSTEMD_UNIT=init.scope
_SYSTEMD_SLICE=-.slice
CODE_FILE=src/core/job.c
JOB_TYPE=start
UNIT=systemd-hostnamed.service
CODE_LINE=768
CODE_FUNC=job_emit_done_message
MESSAGE=Started Hostname Service.
JOB_RESULT=done
MESSAGE_ID=39f53479d3a045ac8e11786248231fbf
JOB_ID=7205
INVOCATION_ID=e38ca30edd074417b011192307f10811
_SOURCE_REALTIME_TIMESTAMP=1709520776193926

Binary file not shown.

View File

@ -1,12 +1,16 @@
"""Test host logs control.""" """Test host logs control."""
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch from unittest.mock import MagicMock, PropertyMock, patch
from aiohttp.client_exceptions import UnixClientConnectorError
from aiohttp.client_reqrep import ConnectionKey
import pytest import pytest
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
from supervisor.exceptions import HostNotSupportedError from supervisor.exceptions import HostNotSupportedError, HostServiceError
from supervisor.host.const import LogFormatter
from supervisor.host.logs import LogsControl from supervisor.host.logs import LogsControl
from supervisor.utils.systemd_journal import journal_logs_reader
from tests.common import load_fixture from tests.common import load_fixture
@ -32,10 +36,16 @@ async def test_logs(coresys: CoreSys, journald_gateway: MagicMock):
"""Test getting logs and errors.""" """Test getting logs and errors."""
assert coresys.host.logs.available is True assert coresys.host.logs.available is True
journald_gateway.feed_data(load_fixture("logs_export_host.txt").encode("utf-8"))
journald_gateway.feed_eof()
async with coresys.host.logs.journald_logs() as resp: async with coresys.host.logs.journald_logs() as resp:
body = await resp.text() line = await anext(
journal_logs_reader(resp, log_formatter=LogFormatter.VERBOSE)
)
assert ( assert (
"Oct 11 20:46:22 odroid-dev systemd[1]: Started Hostname Service." in body line
== "2024-03-04 02:52:56.193 homeassistant systemd[1]: Started Hostname Service."
) )
with patch.object( with patch.object(
@ -45,11 +55,25 @@ async def test_logs(coresys: CoreSys, journald_gateway: MagicMock):
pass pass
async def test_logs_coloured(coresys: CoreSys, journald_gateway: MagicMock):
"""Test ANSI control sequences being preserved in binary messages."""
journald_gateway.feed_data(
load_fixture("logs_export_supervisor.txt").encode("utf-8")
)
journald_gateway.feed_eof()
async with coresys.host.logs.journald_logs() as resp:
line = await anext(journal_logs_reader(resp))
assert (
line
== "\x1b[32m24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor\x1b[0m"
)
async def test_boot_ids(coresys: CoreSys, journald_gateway: MagicMock): async def test_boot_ids(coresys: CoreSys, journald_gateway: MagicMock):
"""Test getting boot ids.""" """Test getting boot ids."""
journald_gateway.return_value.__aenter__.return_value.text = AsyncMock( journald_gateway.feed_data(load_fixture("logs_boot_ids.txt").encode("utf-8"))
return_value=load_fixture("logs_boot_ids.txt") journald_gateway.feed_eof()
)
assert await coresys.host.logs.get_boot_ids() == TEST_BOOT_IDS assert await coresys.host.logs.get_boot_ids() == TEST_BOOT_IDS
@ -73,9 +97,8 @@ async def test_boot_ids(coresys: CoreSys, journald_gateway: MagicMock):
async def test_identifiers(coresys: CoreSys, journald_gateway: MagicMock): async def test_identifiers(coresys: CoreSys, journald_gateway: MagicMock):
"""Test getting identifiers.""" """Test getting identifiers."""
journald_gateway.return_value.__aenter__.return_value.text = AsyncMock( journald_gateway.feed_data(load_fixture("logs_identifiers.txt").encode("utf-8"))
return_value=load_fixture("logs_identifiers.txt") journald_gateway.feed_eof()
)
# Mock is large so just look for a few different types of identifiers # Mock is large so just look for a few different types of identifiers
identifiers = await coresys.host.logs.get_identifiers() identifiers = await coresys.host.logs.get_identifiers()
@ -89,3 +112,21 @@ async def test_identifiers(coresys: CoreSys, journald_gateway: MagicMock):
assert identifier in identifiers assert identifier in identifiers
assert "" not in identifiers assert "" not in identifiers
async def test_connection_refused_handled(
coresys: CoreSys, journald_gateway: MagicMock
):
"""Test connection refused is handled with HostServiceError."""
with patch("supervisor.host.logs.ClientSession.get") as get:
get.side_effect = UnixClientConnectorError(
path="/run/systemd-journal-gatewayd.sock",
connection_key=ConnectionKey(
"localhost", None, False, False, None, None, None
),
os_error=ConnectionRefusedError("Connection refused"),
)
with pytest.raises(HostServiceError):
async with coresys.host.logs.journald_logs():
pass

View File

@ -0,0 +1,188 @@
"""Test systemd journal utilities."""
import asyncio
from unittest.mock import MagicMock
import pytest
from supervisor.exceptions import MalformedBinaryEntryError
from supervisor.host.const import LogFormatter
from supervisor.utils.systemd_journal import (
journal_logs_reader,
journal_plain_formatter,
journal_verbose_formatter,
)
from tests.common import load_fixture
def _journal_logs_mock():
"""Generate mocked stream for journal_logs_reader.
Returns tuple for mocking ClientResponse and its StreamReader
(.content attribute in async context).
"""
stream = asyncio.StreamReader(loop=asyncio.get_running_loop())
journal_logs = MagicMock()
journal_logs.__aenter__.return_value.content = stream
return journal_logs, stream
def test_format_simple():
"""Test plain formatter."""
fields = {"MESSAGE": "Hello, world!"}
assert journal_plain_formatter(fields) == "Hello, world!"
def test_format_simple_newlines():
"""Test plain formatter with newlines in message."""
fields = {"MESSAGE": "Hello,\nworld!\n"}
assert journal_plain_formatter(fields) == "Hello,\nworld!\n"
def test_format_verbose_timestamp():
"""Test timestamp is properly formatted."""
fields = {
"__REALTIME_TIMESTAMP": "1000",
"_HOSTNAME": "x",
"SYSLOG_IDENTIFIER": "x",
"_PID": "1",
"MESSAGE": "x",
}
formatted = journal_verbose_formatter(fields)
assert formatted.startswith(
"1970-01-01 00:00:00.001 "
), f"Invalid log timestamp: {formatted}"
def test_format_verbose():
"""Test verbose formatter."""
fields = {
"__REALTIME_TIMESTAMP": "1379403171000000",
"_HOSTNAME": "homeassistant",
"SYSLOG_IDENTIFIER": "python",
"_PID": "666",
"MESSAGE": "Hello, world!",
}
assert (
journal_verbose_formatter(fields)
== "2013-09-17 07:32:51.000 homeassistant python[666]: Hello, world!"
)
def test_format_verbose_newlines():
"""Test verbose formatter with newlines in message."""
fields = {
"__REALTIME_TIMESTAMP": "1379403171000000",
"_HOSTNAME": "homeassistant",
"SYSLOG_IDENTIFIER": "python",
"_PID": "666",
"MESSAGE": "Hello,\nworld!\n",
}
assert (
journal_verbose_formatter(fields)
== "2013-09-17 07:32:51.000 homeassistant python[666]: Hello,\nworld!\n"
)
async def test_parsing_simple():
"""Test plain formatter."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(b"MESSAGE=Hello, world!\n\n")
line = await anext(journal_logs_reader(journal_logs))
assert line == "Hello, world!"
async def test_parsing_verbose():
"""Test verbose formatter."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(
b"__REALTIME_TIMESTAMP=1379403171000000\n"
b"_HOSTNAME=homeassistant\n"
b"SYSLOG_IDENTIFIER=python\n"
b"_PID=666\n"
b"MESSAGE=Hello, world!\n\n"
)
line = await anext(
journal_logs_reader(journal_logs, log_formatter=LogFormatter.VERBOSE)
)
assert line == "2013-09-17 07:32:51.000 homeassistant python[666]: Hello, world!"
async def test_parsing_newlines_in_message():
"""Test reading and formatting using journal logs reader."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(
b"ID=1\n"
b"MESSAGE\n\x0d\x00\x00\x00\x00\x00\x00\x00Hello,\nworld!\n"
b"AFTER=after\n\n"
)
line = await anext(journal_logs_reader(journal_logs))
assert line == "Hello,\nworld!"
async def test_parsing_newlines_in_multiple_fields():
"""Test entries are correctly separated with newlines in multiple fields."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(
b"ID=1\n"
b"MESSAGE\n\x0e\x00\x00\x00\x00\x00\x00\x00Hello,\nworld!\n\n"
b"ANOTHER\n\x0e\x00\x00\x00\x00\x00\x00\x00Hello,\nworld!\n\n"
b"AFTER=after\n\n"
b"ID=2\n"
b"MESSAGE\n\x0d\x00\x00\x00\x00\x00\x00\x00Hello,\nworld!\n"
b"AFTER=after\n\n"
)
assert await anext(journal_logs_reader(journal_logs)) == "Hello,\nworld!\n"
assert await anext(journal_logs_reader(journal_logs)) == "Hello,\nworld!"
async def test_parsing_two_messages():
"""Test reading multiple messages."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(
b"MESSAGE=Hello, world!\n"
b"ID=1\n\n"
b"MESSAGE=Hello again, world!\n"
b"ID=2\n\n"
)
stream.feed_eof()
reader = journal_logs_reader(journal_logs)
assert await anext(reader) == "Hello, world!"
assert await anext(reader) == "Hello again, world!"
with pytest.raises(StopAsyncIteration):
await anext(reader)
async def test_parsing_malformed_binary_message():
"""Test that malformed binary message raises MalformedBinaryEntryError."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(
b"ID=1\n"
b"MESSAGE\n\x0d\x00\x00\x00\x00\x00\x00\x00Hello, world!"
b"AFTER=after\n\n"
)
with pytest.raises(MalformedBinaryEntryError):
await anext(journal_logs_reader(journal_logs))
async def test_parsing_journal_host_logs():
"""Test parsing of real host logs."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(load_fixture("logs_export_host.txt").encode("utf-8"))
line = await anext(journal_logs_reader(journal_logs))
assert line == "Started Hostname Service."
async def test_parsing_colored_supervisor_logs():
"""Test parsing of real logs with ANSI escape sequences."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(load_fixture("logs_export_supervisor.txt").encode("utf-8"))
line = await anext(journal_logs_reader(journal_logs))
assert (
line
== "\x1b[32m24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor\x1b[0m"
)