From 4d1a5e2dc2c2e93f0d1e015cec198e7cfcab0a22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20=C4=8Cerm=C3=A1k?= Date: Thu, 29 May 2025 11:41:23 +0200 Subject: [PATCH] Use journal-gatewayd's new /boots endpoint to list boots (#5914) * Use journal-gatewayd's new /boots endpoint to list boots Current method we use for getting boots has several known downsides, for example it can miss some incomplete boots and the performance might be worse than what we could get by using Systemd directly. Systemd was missing a method to get list boots through the journal-gatewayd but that should be addressed by the new /boots endpoint added in [1] which returns application/json-seq response containing all boots as reported in `journalctl --list-boots`. Implement Supervisor methods to parse this format and use the endpoint at first, falling back to the old method if it fails. [1] https://github.com/systemd/systemd/pull/37574 * Log info instead of warning when /boots is not present Co-authored-by: Stefan Agner * Split records only by RS instead of LF in journal_boots_reader * Strip only RS, json.loads is fine with whitespace --------- Co-authored-by: Stefan Agner --- supervisor/host/const.py | 1 + supervisor/host/logs.py | 59 ++++++++++++++---- supervisor/utils/systemd_journal.py | 34 +++++++++++ tests/conftest.py | 16 +++++ tests/fixtures/systemd_journal_boots.jsons | 5 ++ tests/host/test_logs.py | 30 +++++++++- tests/utils/test_systemd_journal.py | 70 ++++++++++++++++++++++ 7 files changed, 201 insertions(+), 14 deletions(-) create mode 100644 tests/fixtures/systemd_journal_boots.jsons diff --git a/supervisor/host/const.py b/supervisor/host/const.py index 20ac09c2f..8a97a41aa 100644 --- a/supervisor/host/const.py +++ b/supervisor/host/const.py @@ -80,6 +80,7 @@ class LogFormat(StrEnum): JOURNAL = "application/vnd.fdo.journal" JSON = "application/json" + JSON_SEQ = "application/json-seq" TEXT = "text/plain" diff --git a/supervisor/host/logs.py b/supervisor/host/logs.py index 0e7acef58..3c1174eac 100644 --- a/supervisor/host/logs.py +++ b/supervisor/host/logs.py @@ -25,6 +25,7 @@ from ..exceptions import ( HostServiceError, ) from ..utils.json import read_json_file +from ..utils.systemd_journal import journal_boots_reader from .const import PARAM_BOOT_ID, PARAM_SYSLOG_IDENTIFIER, LogFormat _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -108,12 +109,8 @@ class LogsControl(CoreSysAttributes): return boot_ids[offset] - async def get_boot_ids(self) -> list[str]: - """Get boot IDs from oldest to newest.""" - if self._boot_ids: - # Doesn't change without a reboot, no reason to query again once cached - return self._boot_ids - + async def _get_boot_ids_legacy(self) -> list[str]: + """Get boots IDs using suboptimal method where /boots is not available.""" try: async with self.journald_logs( params=BOOT_IDS_QUERY, @@ -142,13 +139,51 @@ class LogsControl(CoreSysAttributes): _LOGGER.error, ) from err - self._boot_ids = [] + _boot_ids = [] for entry in text.split("\n"): - if ( - entry - and (boot_id := json.loads(entry)[PARAM_BOOT_ID]) not in self._boot_ids - ): - self._boot_ids.append(boot_id) + if entry and (boot_id := json.loads(entry)[PARAM_BOOT_ID]) not in _boot_ids: + _boot_ids.append(boot_id) + + return _boot_ids + + async def _get_boot_ids_native(self): + """Get boot IDs using /boots endpoint.""" + try: + async with self.journald_logs( + path="/boots", + accept=LogFormat.JSON_SEQ, + timeout=ClientTimeout(total=20), + ) as resp: + if resp.status != 200: + raise HostLogError( + f"Got HTTP {resp.status} from /boots.", + _LOGGER.debug, + ) + # Don't rely solely on the order of boots in the response, + # sort the boots by index returned in the response. + boot_id_tuples = [boot async for boot in journal_boots_reader(resp)] + return [ + boot_id for _, boot_id in sorted(boot_id_tuples, key=lambda x: x[0]) + ] + except (ClientError, TimeoutError) as err: + raise HostLogError( + "Could not get a list of boot IDs from systemd-journal-gatewayd", + _LOGGER.error, + ) from err + + async def get_boot_ids(self) -> list[str]: + """Get boot IDs from oldest to newest.""" + if self._boot_ids: + # Doesn't change without a reboot, no reason to query again once cached + return self._boot_ids + + try: + self._boot_ids = await self._get_boot_ids_native() + except HostLogError: + _LOGGER.info( + "Could not get /boots from systemd-journal-gatewayd, using fallback." + ) + self._boot_ids = await self._get_boot_ids_legacy() return self._boot_ids diff --git a/supervisor/utils/systemd_journal.py b/supervisor/utils/systemd_journal.py index e13b99555..7d442b8b8 100644 --- a/supervisor/utils/systemd_journal.py +++ b/supervisor/utils/systemd_journal.py @@ -1,8 +1,10 @@ """Utilities for working with systemd journal export format.""" +from asyncio import IncompleteReadError from collections.abc import AsyncGenerator from datetime import UTC, datetime from functools import wraps +import json from aiohttp import ClientResponse @@ -116,3 +118,35 @@ async def journal_logs_reader( # strip \n for simple fields before decoding entries[name] = data[:-1].decode("utf-8") + + +def _parse_boot_json(boot_json_bytes: bytes) -> tuple[int, str]: + boot_dict = json.loads(boot_json_bytes.decode("utf-8")) + return ( + int(boot_dict["index"]), + boot_dict["boot_id"], + ) + + +async def journal_boots_reader( + response: ClientResponse, +) -> AsyncGenerator[tuple[int, str]]: + """Read boots from json-seq response from systemd journal gateway. + + Returns generator of (index, boot_id) tuples. + """ + async with response as resp: + + async def read_record() -> bytes: + try: + return await resp.content.readuntil(b"\x1e") + except IncompleteReadError as e: + return e.partial + + while line := await read_record(): + line = line.strip(b"\x1e") + if not line: + continue + yield _parse_boot_json(line) + + return diff --git a/tests/conftest.py b/tests/conftest.py index e8902e7cd..6d74854cb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -47,6 +47,7 @@ from supervisor.coresys import CoreSys from supervisor.dbus.network import NetworkManager from supervisor.docker.manager import DockerAPI from supervisor.docker.monitor import DockerMonitor +from supervisor.exceptions import HostLogError from supervisor.homeassistant.api import APIState from supervisor.host.logs import LogsControl from supervisor.os.manager import OSManager @@ -463,6 +464,7 @@ async def journald_gateway() -> AsyncGenerator[MagicMock]: return (await client_response.content.read()).decode("utf-8") client_response.text = response_text + client_response.status = 200 get.return_value.__aenter__.return_value = client_response get.return_value.__aenter__.return_value.__aenter__.return_value = ( @@ -471,6 +473,20 @@ async def journald_gateway() -> AsyncGenerator[MagicMock]: yield client_response +@pytest.fixture +async def without_journal_gatewayd_boots() -> AsyncGenerator[MagicMock]: + """Make method using /boots of systemd-journald-gateway fail.""" + + def raise_host_log_error_side_effect(*args, **kwargs): + raise HostLogError("Mocked error") + + with patch( + "supervisor.host.logs.LogsControl._get_boot_ids_native" + ) as get_boot_ids_native: + get_boot_ids_native.side_effect = raise_host_log_error_side_effect + yield get_boot_ids_native + + @pytest.fixture async def journal_logs_reader() -> MagicMock: """Mock journal_logs_reader in host API.""" diff --git a/tests/fixtures/systemd_journal_boots.jsons b/tests/fixtures/systemd_journal_boots.jsons new file mode 100644 index 000000000..dd42b492e --- /dev/null +++ b/tests/fixtures/systemd_journal_boots.jsons @@ -0,0 +1,5 @@ +{"index":0,"boot_id":"aeaa3d67efe9410ba7210cbee21bb022","first_entry":1748034681901673,"last_entry":1748266575896080} +{"index":-1,"boot_id":"b81a432efb2d4a71a1e2d1b42892c187","first_entry":1748029923668450,"last_entry":1748034525932010} +{"index":-2,"boot_id":"3fa78d16a4ee4975b632d24dea52404c","first_entry":1748029753658977,"last_entry":1748029822011659} +{"index":-3,"boot_id":"2411cf84b38a41939de746afc7a22e18","first_entry":1748029538004767,"last_entry":1748029695950467} +{"index":-4,"boot_id":"77f66b1fd6b2416e8eebf509c9a470e6","first_entry":1748029186112301,"last_entry":1748029453598350} \ No newline at end of file diff --git a/tests/host/test_logs.py b/tests/host/test_logs.py index 19cbb187a..d80eef0b6 100644 --- a/tests/host/test_logs.py +++ b/tests/host/test_logs.py @@ -20,6 +20,14 @@ TEST_BOOT_IDS = [ "b1c386a144fd44db8f855d7e907256f8", ] +TEST_BOOTS_IDS_NATIVE = [ + "77f66b1fd6b2416e8eebf509c9a470e6", + "2411cf84b38a41939de746afc7a22e18", + "3fa78d16a4ee4975b632d24dea52404c", + "b81a432efb2d4a71a1e2d1b42892c187", + "aeaa3d67efe9410ba7210cbee21bb022", +] + async def test_load(coresys: CoreSys): """Test load.""" @@ -82,7 +90,11 @@ async def test_logs_coloured(journald_gateway: MagicMock, coresys: CoreSys): ) -async def test_boot_ids(journald_gateway: MagicMock, coresys: CoreSys): +async def test_boot_ids( + journald_gateway: MagicMock, + coresys: CoreSys, + without_journal_gatewayd_boots: MagicMock, +): """Test getting boot ids.""" journald_gateway.content.feed_data( load_fixture("logs_boot_ids.txt").encode("utf-8") @@ -109,7 +121,11 @@ async def test_boot_ids(journald_gateway: MagicMock, coresys: CoreSys): await coresys.host.logs.get_boot_id(3) -async def test_boot_ids_fallback(journald_gateway: MagicMock, coresys: CoreSys): +async def test_boot_ids_legacy_fallback( + journald_gateway: MagicMock, + coresys: CoreSys, + without_journal_gatewayd_boots: MagicMock, +): """Test getting boot ids using fallback.""" # Initial response has no log lines journald_gateway.content.feed_data(b"") @@ -134,6 +150,16 @@ async def test_boot_ids_fallback(journald_gateway: MagicMock, coresys: CoreSys): ] +async def test_boot_ids_native(journald_gateway: MagicMock, coresys: CoreSys): + """Test getting boot ids from /boots endpoint.""" + journald_gateway.content.feed_data( + load_fixture("systemd_journal_boots.jsons").encode("utf-8") + ) + journald_gateway.content.feed_eof() + + assert await coresys.host.logs.get_boot_ids() == TEST_BOOTS_IDS_NATIVE + + async def test_identifiers(journald_gateway: MagicMock, coresys: CoreSys): """Test getting identifiers.""" journald_gateway.content.feed_data( diff --git a/tests/utils/test_systemd_journal.py b/tests/utils/test_systemd_journal.py index 7d683c027..440105aff 100644 --- a/tests/utils/test_systemd_journal.py +++ b/tests/utils/test_systemd_journal.py @@ -8,6 +8,7 @@ import pytest from supervisor.exceptions import MalformedBinaryEntryError from supervisor.host.const import LogFormatter from supervisor.utils.systemd_journal import ( + journal_boots_reader, journal_logs_reader, journal_plain_formatter, journal_verbose_formatter, @@ -205,3 +206,72 @@ async def test_parsing_colored_supervisor_logs(): line == "\x1b[32m24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor\x1b[0m" ) + + +async def test_parsing_boots(): + """Test parsing of boots.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data( + b'\x1e{"index":0,"boot_id":"e9ba6e5d9bc745c591686a502e3ed817","first_entry":1748251653514247,"last_entry":1748258595644563}\x0a' + b'\x1e{"index":-1,"boot_id":"2087f5f269724a48852c92a2e663fb94","first_entry":1748012078520355,"last_entry":1748023322834353}\x0a' + b'\x1e{"index":-2,"boot_id":"865a4aa6128e4747917047c09f400d0d","first_entry":1748011941404183,"last_entry":1748012025742472}' + ) + stream.feed_eof() + + boots = [] + async for index, boot_id in journal_boots_reader(journal_logs): + boots.append((index, boot_id)) + + assert boots == [ + (0, "e9ba6e5d9bc745c591686a502e3ed817"), + (-1, "2087f5f269724a48852c92a2e663fb94"), + (-2, "865a4aa6128e4747917047c09f400d0d"), + ] + + +async def test_parsing_boots_no_lf(): + """Test parsing of boots without LF separator (only RS).""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data( + b'\x1e{"index":0,"boot_id":"e9ba6e5d9bc745c591686a502e3ed817","first_entry":1748251653514247,"last_entry":1748258595644563}' + b'\x1e{"index":-1,"boot_id":"2087f5f269724a48852c92a2e663fb94","first_entry":1748012078520355,"last_entry":1748023322834353}' + b'\x1e{"index":-2,"boot_id":"865a4aa6128e4747917047c09f400d0d","first_entry":1748011941404183,"last_entry":1748012025742472}' + ) + stream.feed_eof() + + boots = [] + async for index, boot_id in journal_boots_reader(journal_logs): + boots.append((index, boot_id)) + + assert boots == [ + (0, "e9ba6e5d9bc745c591686a502e3ed817"), + (-1, "2087f5f269724a48852c92a2e663fb94"), + (-2, "865a4aa6128e4747917047c09f400d0d"), + ] + + +async def test_parsing_boots_single(): + """Test parsing of single boot with trailing LF.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data( + b'\x1e{"index":0,"boot_id":"e9ba6e5d9bc745c591686a502e3ed817","first_entry":1748251653514247,"last_entry":1748258595644563}\x0a' + ) + stream.feed_eof() + + boots = [] + async for index, boot_id in journal_boots_reader(journal_logs): + boots.append((index, boot_id)) + + assert boots == [(0, "e9ba6e5d9bc745c591686a502e3ed817")] + + +async def test_parsing_boots_none(): + """Test parsing of empty boot response.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_eof() + + boots = [] + async for index, boot_id in journal_boots_reader(journal_logs): + boots.append((index, boot_id)) + + assert boots == []