From 0814552b2ad5228d363a7913d3b711d3afb000c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20=C4=8Cerm=C3=A1k?= Date: Wed, 20 Mar 2024 09:00:45 +0100 Subject: [PATCH] Use Journal Export Format for host (advanced) logs (#4963) * Use Journal Export Format for host (advanced) logs Add methods for handling Journal Export Format and use it for fetching of host logs. This is foundation for colored streaming logs for other endpoints as well. * Make pylint happier - remove extra pass statement * Rewrite journal gateway tests to mock ClientResponse's StreamReader * Handle connection refused error when connecting to journal-gatewayd * Use SYSTEMD_JOURNAL_GATEWAYD_SOCKET global path also for connection * Use parsing algorithm suggested by @agners in review * Fix timestamps in formatting, always use UTC for now * Add tests for Accept header in host logs * Apply suggestions from @agners Co-authored-by: Stefan Agner * Bail out of parsing earlier if field is not in required fields * Fix parsing issue discovered in the wild and add test case * Make verbose formatter more tolerant * Use some bytes' native functions for some minor optimizations * Move MalformedBinaryEntryError to exceptions module, add test for it --------- Co-authored-by: Stefan Agner --- supervisor/api/const.py | 1 + supervisor/api/host.py | 26 ++- supervisor/exceptions.py | 11 ++ supervisor/host/const.py | 7 + supervisor/host/logs.py | 39 +++-- supervisor/utils/systemd_journal.py | 113 +++++++++++++ tests/api/test_host.py | 46 +++++- tests/conftest.py | 32 ++-- tests/fixtures/logs_export_host.txt | 36 +++++ tests/fixtures/logs_export_supervisor.txt | Bin 0 -> 1327 bytes tests/host/test_logs.py | 61 +++++-- tests/utils/test_systemd_journal.py | 188 ++++++++++++++++++++++ 12 files changed, 519 insertions(+), 41 deletions(-) create mode 100644 supervisor/utils/systemd_journal.py create mode 100644 tests/fixtures/logs_export_host.txt create mode 100644 tests/fixtures/logs_export_supervisor.txt create mode 100644 tests/utils/test_systemd_journal.py diff --git a/supervisor/api/const.py b/supervisor/api/const.py index e684a2a0d..80fff328a 100644 --- a/supervisor/api/const.py +++ b/supervisor/api/const.py @@ -8,6 +8,7 @@ CONTENT_TYPE_PNG = "image/png" CONTENT_TYPE_TAR = "application/tar" CONTENT_TYPE_TEXT = "text/plain" CONTENT_TYPE_URL = "application/x-www-form-urlencoded" +CONTENT_TYPE_X_LOG = "text/x-log" COOKIE_INGRESS = "ingress_session" diff --git a/supervisor/api/host.py b/supervisor/api/host.py index 60a39500b..281f1cfa3 100644 --- a/supervisor/api/host.py +++ b/supervisor/api/host.py @@ -28,7 +28,14 @@ from ..const import ( ) from ..coresys import CoreSysAttributes from ..exceptions import APIError, HostLogError -from ..host.const import PARAM_BOOT_ID, PARAM_FOLLOW, PARAM_SYSLOG_IDENTIFIER +from ..host.const import ( + PARAM_BOOT_ID, + PARAM_FOLLOW, + PARAM_SYSLOG_IDENTIFIER, + LogFormat, + LogFormatter, +) +from ..utils.systemd_journal import journal_logs_reader from .const import ( ATTR_AGENT_VERSION, ATTR_APPARMOR_VERSION, @@ -43,6 +50,7 @@ from .const import ( ATTR_STARTUP_TIME, ATTR_USE_NTP, CONTENT_TYPE_TEXT, + CONTENT_TYPE_X_LOG, ) from .utils import api_process, api_validate @@ -158,6 +166,7 @@ class APIHost(CoreSysAttributes): self, request: web.Request, identifier: str | None = None, follow: bool = False ) -> web.StreamResponse: """Return systemd-journald logs.""" + log_formatter = LogFormatter.PLAIN params = {} if identifier: params[PARAM_SYSLOG_IDENTIFIER] = identifier @@ -165,6 +174,8 @@ class APIHost(CoreSysAttributes): params[PARAM_SYSLOG_IDENTIFIER] = request.match_info.get(IDENTIFIER) else: params[PARAM_SYSLOG_IDENTIFIER] = self.sys_host.logs.default_identifiers + # host logs should be always verbose, no matter what Accept header is used + log_formatter = LogFormatter.VERBOSE if BOOTID in request.match_info: params[PARAM_BOOT_ID] = await self._get_boot_id( @@ -175,26 +186,31 @@ class APIHost(CoreSysAttributes): if ACCEPT in request.headers and request.headers[ACCEPT] not in [ CONTENT_TYPE_TEXT, + CONTENT_TYPE_X_LOG, "*/*", ]: raise APIError( - "Invalid content type requested. Only text/plain supported for now." + "Invalid content type requested. Only text/plain and text/x-log " + "supported for now." ) + if request.headers[ACCEPT] == CONTENT_TYPE_X_LOG: + log_formatter = LogFormatter.VERBOSE + if RANGE in request.headers: range_header = request.headers.get(RANGE) else: range_header = f"entries=:-{DEFAULT_RANGE}:" async with self.sys_host.logs.journald_logs( - params=params, range_header=range_header + params=params, range_header=range_header, accept=LogFormat.JOURNAL ) as resp: try: response = web.StreamResponse() response.content_type = CONTENT_TYPE_TEXT await response.prepare(request) - async for data in resp.content: - await response.write(data) + async for line in journal_logs_reader(resp, log_formatter): + await response.write(line.encode("utf-8") + b"\n") except ConnectionResetError as ex: raise APIError( "Connection reset when trying to fetch data from systemd-journald." diff --git a/supervisor/exceptions.py b/supervisor/exceptions.py index d040b7707..9267289d3 100644 --- a/supervisor/exceptions.py +++ b/supervisor/exceptions.py @@ -509,6 +509,17 @@ class WhoamiConnectivityError(WhoamiError): """Connectivity errors while using whoami.""" +# utils/systemd_journal + + +class SystemdJournalError(HassioError): + """Error while processing systemd journal logs.""" + + +class MalformedBinaryEntryError(SystemdJournalError): + """Raised when binary entry in the journal isn't followed by a newline.""" + + # docker/api diff --git a/supervisor/host/const.py b/supervisor/host/const.py index 42931cc12..ea70325ef 100644 --- a/supervisor/host/const.py +++ b/supervisor/host/const.py @@ -62,3 +62,10 @@ class LogFormat(StrEnum): JOURNAL = "application/vnd.fdo.journal" JSON = "application/json" TEXT = "text/plain" + + +class LogFormatter(StrEnum): + """Log formatter.""" + + PLAIN = "plain" + VERBOSE = "verbose" diff --git a/supervisor/host/logs.py b/supervisor/host/logs.py index 27f7cab81..0e0ef5059 100644 --- a/supervisor/host/logs.py +++ b/supervisor/host/logs.py @@ -7,12 +7,18 @@ import logging from pathlib import Path from aiohttp import ClientError, ClientSession, ClientTimeout +from aiohttp.client_exceptions import UnixClientConnectorError from aiohttp.client_reqrep import ClientResponse from aiohttp.connector import UnixConnector from aiohttp.hdrs import ACCEPT, RANGE from ..coresys import CoreSys, CoreSysAttributes -from ..exceptions import ConfigurationFileError, HostLogError, HostNotSupportedError +from ..exceptions import ( + ConfigurationFileError, + HostLogError, + HostNotSupportedError, + HostServiceError, +) from ..utils.json import read_json_file from .const import PARAM_BOOT_ID, PARAM_SYSLOG_IDENTIFIER, LogFormat @@ -138,16 +144,21 @@ class LogsControl(CoreSysAttributes): "No systemd-journal-gatewayd Unix socket available", _LOGGER.error ) - async with ClientSession( - connector=UnixConnector(path="/run/systemd-journal-gatewayd.sock") - ) as session: - headers = {ACCEPT: accept} - if range_header: - headers[RANGE] = range_header - async with session.get( - f"http://localhost{path}", - headers=headers, - params=params or {}, - timeout=timeout, - ) as client_response: - yield client_response + try: + async with ClientSession( + connector=UnixConnector(path=str(SYSTEMD_JOURNAL_GATEWAYD_SOCKET)) + ) as session: + headers = {ACCEPT: accept} + if range_header: + headers[RANGE] = range_header + async with session.get( + f"http://localhost{path}", + headers=headers, + params=params or {}, + timeout=timeout, + ) as client_response: + yield client_response + except UnixClientConnectorError as ex: + raise HostServiceError( + "Unable to connect to systemd-journal-gatewayd", _LOGGER.error + ) from ex diff --git a/supervisor/utils/systemd_journal.py b/supervisor/utils/systemd_journal.py new file mode 100644 index 000000000..1ad71bbb0 --- /dev/null +++ b/supervisor/utils/systemd_journal.py @@ -0,0 +1,113 @@ +"""Utilities for working with systemd journal export format.""" +from collections.abc import AsyncGenerator +from datetime import UTC, datetime +from functools import wraps + +from aiohttp import ClientResponse + +from supervisor.exceptions import MalformedBinaryEntryError +from supervisor.host.const import LogFormatter + + +def formatter(required_fields: list[str]): + """Decorate journal entry formatters with list of required fields. + + Helper decorator that can be used for getting list of required fields for a journal + formatter function using function.required_fields function attribute. + """ + + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + wrapper.required_fields = required_fields + return wrapper + + return decorator + + +@formatter(["MESSAGE"]) +def journal_plain_formatter(entries: dict[str, str]) -> str: + """Format parsed journal entries as a plain message.""" + return entries["MESSAGE"] + + +@formatter( + [ + "__REALTIME_TIMESTAMP", + "_HOSTNAME", + "SYSLOG_IDENTIFIER", + "_PID", + "MESSAGE", + ] +) +def journal_verbose_formatter(entries: dict[str, str]) -> str: + """Format parsed journal entries to a journalctl-like format.""" + ts = datetime.fromtimestamp( + int(entries["__REALTIME_TIMESTAMP"]) / 1e6, UTC + ).isoformat(sep=" ", timespec="milliseconds") + ts = ts[: ts.index(".") + 4] # strip TZ offset + + identifier = ( + f"{entries.get("SYSLOG_IDENTIFIER", "_UNKNOWN_")}[{entries["_PID"]}]" + if "_PID" in entries + else entries.get("SYSLOG_IDENTIFIER", "_UNKNOWN_") + ) + + return f"{ts} {entries.get("_HOSTNAME", "")} {identifier}: {entries.get("MESSAGE", "")}" + + +async def journal_logs_reader( + journal_logs: ClientResponse, + log_formatter: LogFormatter = LogFormatter.PLAIN, +) -> AsyncGenerator[str, None]: + """Read logs from systemd journal line by line, formatted using the given formatter.""" + match log_formatter: + case LogFormatter.PLAIN: + formatter_ = journal_plain_formatter + case LogFormatter.VERBOSE: + formatter_ = journal_verbose_formatter + case _: + raise ValueError(f"Unknown log format: {log_formatter}") + + async with journal_logs as resp: + entries: dict[str, str] = {} + while not resp.content.at_eof(): + line = await resp.content.readuntil(b"\n") + # newline means end of message: + if line == b"\n": + yield formatter_(entries) + entries = {} + continue + + # Journal fields consisting only of valid non-control UTF-8 codepoints + # are serialized as they are (i.e. the field name, followed by '=', + # followed by field data), followed by a newline as separator to the next + # field. Note that fields containing newlines cannot be formatted like + # this. Non-control UTF-8 codepoints are the codepoints with value at or + # above 32 (' '), or equal to 9 (TAB). + name, sep, data = line.partition(b"=") + if not sep: + # Other journal fields are serialized in a special binary safe way: + # field name, followed by newline + name = name[:-1] # strip \n + # followed by a binary 64-bit little endian size value, + length_raw = await resp.content.readexactly(8) + length = int.from_bytes(length_raw, byteorder="little") + # followed by the binary field data, + data = await resp.content.readexactly(length + 1) + # followed by a newline as separator to the next field. + if not data.endswith(b"\n"): + raise MalformedBinaryEntryError( + f"Failed parsing binary entry {data}" + ) + + name = name.decode("utf-8") + if name not in formatter_.required_fields: + # we must read to the end of the entry in the stream, so we can + # only continue the loop here + continue + + # strip \n for simple fields before decoding + entries[name] = data[:-1].decode("utf-8") diff --git a/tests/api/test_host.py b/tests/api/test_host.py index 9b6146e6d..8df7cd91a 100644 --- a/tests/api/test_host.py +++ b/tests/api/test_host.py @@ -1,12 +1,13 @@ """Test Host API.""" -from unittest.mock import MagicMock +from unittest.mock import ANY, MagicMock from aiohttp.test_utils import TestClient import pytest from supervisor.coresys import CoreSys from supervisor.dbus.resolved import Resolved +from supervisor.host.const import LogFormat, LogFormatter DEFAULT_RANGE = "entries=:-100:" # pylint: disable=protected-access @@ -154,6 +155,7 @@ async def test_advanced_logs( journald_logs.assert_called_once_with( params={"SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers}, range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() @@ -161,7 +163,9 @@ async def test_advanced_logs( identifier = "dropbear" await api_client.get(f"/host/logs/identifiers/{identifier}") journald_logs.assert_called_once_with( - params={"SYSLOG_IDENTIFIER": identifier}, range_header=DEFAULT_RANGE + params={"SYSLOG_IDENTIFIER": identifier}, + range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() @@ -174,6 +178,7 @@ async def test_advanced_logs( "SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers, }, range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() @@ -182,6 +187,7 @@ async def test_advanced_logs( journald_logs.assert_called_once_with( params={"_BOOT_ID": bootid, "SYSLOG_IDENTIFIER": identifier}, range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() @@ -191,6 +197,7 @@ async def test_advanced_logs( journald_logs.assert_called_once_with( params={"SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers}, range_header=headers["Range"], + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() @@ -202,6 +209,7 @@ async def test_advanced_logs( "follow": "", }, range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) @@ -216,6 +224,7 @@ async def test_advanced_logs_boot_id_offset( "SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers, }, range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() @@ -227,6 +236,7 @@ async def test_advanced_logs_boot_id_offset( "SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers, }, range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() @@ -238,11 +248,41 @@ async def test_advanced_logs_boot_id_offset( "SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers, }, range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() +async def test_advanced_logs_formatters( + api_client: TestClient, + coresys: CoreSys, + journald_gateway: MagicMock, + journal_logs_reader: MagicMock, +): + """Test advanced logs formatters varying on Accept header.""" + + await api_client.get("/host/logs") + journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE) + + journal_logs_reader.reset_mock() + + headers = {"Accept": "text/x-log"} + await api_client.get("/host/logs", headers=headers) + journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE) + + journal_logs_reader.reset_mock() + + await api_client.get("/host/logs/identifiers/test") + journal_logs_reader.assert_called_once_with(ANY, LogFormatter.PLAIN) + + journal_logs_reader.reset_mock() + + headers = {"Accept": "text/x-log"} + await api_client.get("/host/logs/identifiers/test", headers=headers) + journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE) + + async def test_advanced_logs_errors(api_client: TestClient): """Test advanced logging API errors.""" # coresys = coresys_logs_control @@ -257,5 +297,5 @@ async def test_advanced_logs_errors(api_client: TestClient): assert result["result"] == "error" assert ( result["message"] - == "Invalid content type requested. Only text/plain supported for now." + == "Invalid content type requested. Only text/plain and text/x-log supported for now." ) diff --git a/tests/conftest.py b/tests/conftest.py index 6fdd2f74f..90b15dc6f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ """Common test functions.""" +import asyncio from functools import partial from inspect import unwrap import os @@ -47,12 +48,7 @@ from supervisor.store.addon import AddonStore from supervisor.store.repository import Repository from supervisor.utils.dt import utcnow -from .common import ( - load_binary_fixture, - load_fixture, - load_json_fixture, - mock_dbus_services, -) +from .common import load_binary_fixture, load_json_fixture, mock_dbus_services from .const import TEST_ADDON_SLUG from .dbus_service_mocks.base import DBusServiceMock from .dbus_service_mocks.network_connection_settings import ( @@ -408,10 +404,28 @@ async def journald_gateway() -> MagicMock: with patch("supervisor.host.logs.Path.is_socket", return_value=True), patch( "supervisor.host.logs.ClientSession.get" ) as get: - get.return_value.__aenter__.return_value.text = AsyncMock( - return_value=load_fixture("logs_host.txt") + reader = asyncio.StreamReader(loop=asyncio.get_running_loop()) + + async def response_text(): + return (await reader.read()).decode("utf-8") + + client_response = MagicMock( + content=reader, + text=response_text, ) - yield get + + get.return_value.__aenter__.return_value = client_response + get.return_value.__aenter__.return_value.__aenter__.return_value = ( + client_response + ) + yield reader + + +@pytest.fixture +async def journal_logs_reader() -> MagicMock: + """Mock journal_logs_reader in host API.""" + with patch("supervisor.api.host.journal_logs_reader") as reader: + yield reader @pytest.fixture diff --git a/tests/fixtures/logs_export_host.txt b/tests/fixtures/logs_export_host.txt new file mode 100644 index 000000000..f7223445e --- /dev/null +++ b/tests/fixtures/logs_export_host.txt @@ -0,0 +1,36 @@ +__CURSOR=s=83fee99ca0c3466db5fc120d52ca7dd8;i=203f2ce;b=f5a5c442fa6548cf97474d2d57c920b3;m=3191a3c620;t=612ccd299e7af;x=8675b540119d10bb +__REALTIME_TIMESTAMP=1709520776193967 +__MONOTONIC_TIMESTAMP=212896826912 +__SEQNUM=33813198 +__SEQNUM_ID=83fee99ca0c3466db5fc120d52ca7dd8 +_BOOT_ID=f5a5c442fa6548cf97474d2d57c920b3 +PRIORITY=6 +_TRANSPORT=journal +_UID=0 +_GID=0 +_CAP_EFFECTIVE=1ffffffffff +_MACHINE_ID=edb710b8363b4ff48aed6e75a27f85a6 +_HOSTNAME=homeassistant +_RUNTIME_SCOPE=system +SYSLOG_FACILITY=37 +TID=1 +SYSLOG_IDENTIFIER=systemd +_PID=1 +_COMM=systemd +_EXE=/usr/lib/systemd/systemd +_CMDLINE=/sbin/init +_SYSTEMD_CGROUP=/init.scope +_SYSTEMD_UNIT=init.scope +_SYSTEMD_SLICE=-.slice +CODE_FILE=src/core/job.c +JOB_TYPE=start +UNIT=systemd-hostnamed.service +CODE_LINE=768 +CODE_FUNC=job_emit_done_message +MESSAGE=Started Hostname Service. +JOB_RESULT=done +MESSAGE_ID=39f53479d3a045ac8e11786248231fbf +JOB_ID=7205 +INVOCATION_ID=e38ca30edd074417b011192307f10811 +_SOURCE_REALTIME_TIMESTAMP=1709520776193926 + diff --git a/tests/fixtures/logs_export_supervisor.txt b/tests/fixtures/logs_export_supervisor.txt new file mode 100644 index 0000000000000000000000000000000000000000..c038c9da1072f816a1b61c9499f9c42ab94e0f51 GIT binary patch literal 1327 zcmah}U60~66zy~V!oKj>K0q9Q#P%RRq#-b=NKBKk>ULCFjz1RK0V)Dj`|oR(nSt$A znkY(a-{a$Z?!CTBV1_t3YwLS4Bfxr@IG_{sZw*msp5#!Tdc%q5H z7^=z;sjkWDb|srm{C+qe_xd?ftH2S$7^!s_1bvI^ zFPBv%?7{rujy6)oFNKKD!!ho+`}l1&uV%}#{?;-YKaRY8h~v?CY!3UA-tGg7h>z!e ze4F15Cqpnsl^C^)LNVXr#W|^@6(Vt3-u*& zRhB=7KwQAi_A~)KLn-R9_n2vCQ%8x0)H9+(oF8S|)mc@DjUI2F59dBo&AR?lQEe_) zuRcY&th+3*HtGdE4BLYm&VOEH4u|6%pq#1sdbt#cgzDv96UbPqQ8=7Xn`mx&Y&7BC z2hU+b33I@DgDnb0>q6zz#P>m&{fhD{wmVCgErRR+b*8McMRE1?^Tr$}+e5rL?A-VQ z98P+FiuKN>czoSCzR7ra)+aZdqLAUGT!au7Ar|wpX#VGrTSzX@VkPrO(VH|}WMe%5 z8ioeodC=vk<(wqOP~WE#megAnw+<&;ri6U*