diff --git a/supervisor/host/const.py b/supervisor/host/const.py index 20ac09c2f..8a97a41aa 100644 --- a/supervisor/host/const.py +++ b/supervisor/host/const.py @@ -80,6 +80,7 @@ class LogFormat(StrEnum): JOURNAL = "application/vnd.fdo.journal" JSON = "application/json" + JSON_SEQ = "application/json-seq" TEXT = "text/plain" diff --git a/supervisor/host/logs.py b/supervisor/host/logs.py index 0e7acef58..3c1174eac 100644 --- a/supervisor/host/logs.py +++ b/supervisor/host/logs.py @@ -25,6 +25,7 @@ from ..exceptions import ( HostServiceError, ) from ..utils.json import read_json_file +from ..utils.systemd_journal import journal_boots_reader from .const import PARAM_BOOT_ID, PARAM_SYSLOG_IDENTIFIER, LogFormat _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -108,12 +109,8 @@ class LogsControl(CoreSysAttributes): return boot_ids[offset] - async def get_boot_ids(self) -> list[str]: - """Get boot IDs from oldest to newest.""" - if self._boot_ids: - # Doesn't change without a reboot, no reason to query again once cached - return self._boot_ids - + async def _get_boot_ids_legacy(self) -> list[str]: + """Get boots IDs using suboptimal method where /boots is not available.""" try: async with self.journald_logs( params=BOOT_IDS_QUERY, @@ -142,13 +139,51 @@ class LogsControl(CoreSysAttributes): _LOGGER.error, ) from err - self._boot_ids = [] + _boot_ids = [] for entry in text.split("\n"): - if ( - entry - and (boot_id := json.loads(entry)[PARAM_BOOT_ID]) not in self._boot_ids - ): - self._boot_ids.append(boot_id) + if entry and (boot_id := json.loads(entry)[PARAM_BOOT_ID]) not in _boot_ids: + _boot_ids.append(boot_id) + + return _boot_ids + + async def _get_boot_ids_native(self): + """Get boot IDs using /boots endpoint.""" + try: + async with self.journald_logs( + path="/boots", + accept=LogFormat.JSON_SEQ, + timeout=ClientTimeout(total=20), + ) as resp: + if resp.status != 200: + raise HostLogError( + f"Got HTTP {resp.status} from /boots.", + _LOGGER.debug, + ) + # Don't rely solely on the order of boots in the response, + # sort the boots by index returned in the response. + boot_id_tuples = [boot async for boot in journal_boots_reader(resp)] + return [ + boot_id for _, boot_id in sorted(boot_id_tuples, key=lambda x: x[0]) + ] + except (ClientError, TimeoutError) as err: + raise HostLogError( + "Could not get a list of boot IDs from systemd-journal-gatewayd", + _LOGGER.error, + ) from err + + async def get_boot_ids(self) -> list[str]: + """Get boot IDs from oldest to newest.""" + if self._boot_ids: + # Doesn't change without a reboot, no reason to query again once cached + return self._boot_ids + + try: + self._boot_ids = await self._get_boot_ids_native() + except HostLogError: + _LOGGER.info( + "Could not get /boots from systemd-journal-gatewayd, using fallback." + ) + self._boot_ids = await self._get_boot_ids_legacy() return self._boot_ids diff --git a/supervisor/utils/systemd_journal.py b/supervisor/utils/systemd_journal.py index e13b99555..7d442b8b8 100644 --- a/supervisor/utils/systemd_journal.py +++ b/supervisor/utils/systemd_journal.py @@ -1,8 +1,10 @@ """Utilities for working with systemd journal export format.""" +from asyncio import IncompleteReadError from collections.abc import AsyncGenerator from datetime import UTC, datetime from functools import wraps +import json from aiohttp import ClientResponse @@ -116,3 +118,35 @@ async def journal_logs_reader( # strip \n for simple fields before decoding entries[name] = data[:-1].decode("utf-8") + + +def _parse_boot_json(boot_json_bytes: bytes) -> tuple[int, str]: + boot_dict = json.loads(boot_json_bytes.decode("utf-8")) + return ( + int(boot_dict["index"]), + boot_dict["boot_id"], + ) + + +async def journal_boots_reader( + response: ClientResponse, +) -> AsyncGenerator[tuple[int, str]]: + """Read boots from json-seq response from systemd journal gateway. + + Returns generator of (index, boot_id) tuples. + """ + async with response as resp: + + async def read_record() -> bytes: + try: + return await resp.content.readuntil(b"\x1e") + except IncompleteReadError as e: + return e.partial + + while line := await read_record(): + line = line.strip(b"\x1e") + if not line: + continue + yield _parse_boot_json(line) + + return diff --git a/tests/conftest.py b/tests/conftest.py index e8902e7cd..6d74854cb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -47,6 +47,7 @@ from supervisor.coresys import CoreSys from supervisor.dbus.network import NetworkManager from supervisor.docker.manager import DockerAPI from supervisor.docker.monitor import DockerMonitor +from supervisor.exceptions import HostLogError from supervisor.homeassistant.api import APIState from supervisor.host.logs import LogsControl from supervisor.os.manager import OSManager @@ -463,6 +464,7 @@ async def journald_gateway() -> AsyncGenerator[MagicMock]: return (await client_response.content.read()).decode("utf-8") client_response.text = response_text + client_response.status = 200 get.return_value.__aenter__.return_value = client_response get.return_value.__aenter__.return_value.__aenter__.return_value = ( @@ -471,6 +473,20 @@ async def journald_gateway() -> AsyncGenerator[MagicMock]: yield client_response +@pytest.fixture +async def without_journal_gatewayd_boots() -> AsyncGenerator[MagicMock]: + """Make method using /boots of systemd-journald-gateway fail.""" + + def raise_host_log_error_side_effect(*args, **kwargs): + raise HostLogError("Mocked error") + + with patch( + "supervisor.host.logs.LogsControl._get_boot_ids_native" + ) as get_boot_ids_native: + get_boot_ids_native.side_effect = raise_host_log_error_side_effect + yield get_boot_ids_native + + @pytest.fixture async def journal_logs_reader() -> MagicMock: """Mock journal_logs_reader in host API.""" diff --git a/tests/fixtures/systemd_journal_boots.jsons b/tests/fixtures/systemd_journal_boots.jsons new file mode 100644 index 000000000..dd42b492e --- /dev/null +++ b/tests/fixtures/systemd_journal_boots.jsons @@ -0,0 +1,5 @@ +{"index":0,"boot_id":"aeaa3d67efe9410ba7210cbee21bb022","first_entry":1748034681901673,"last_entry":1748266575896080} +{"index":-1,"boot_id":"b81a432efb2d4a71a1e2d1b42892c187","first_entry":1748029923668450,"last_entry":1748034525932010} +{"index":-2,"boot_id":"3fa78d16a4ee4975b632d24dea52404c","first_entry":1748029753658977,"last_entry":1748029822011659} +{"index":-3,"boot_id":"2411cf84b38a41939de746afc7a22e18","first_entry":1748029538004767,"last_entry":1748029695950467} +{"index":-4,"boot_id":"77f66b1fd6b2416e8eebf509c9a470e6","first_entry":1748029186112301,"last_entry":1748029453598350} \ No newline at end of file diff --git a/tests/host/test_logs.py b/tests/host/test_logs.py index 19cbb187a..d80eef0b6 100644 --- a/tests/host/test_logs.py +++ b/tests/host/test_logs.py @@ -20,6 +20,14 @@ TEST_BOOT_IDS = [ "b1c386a144fd44db8f855d7e907256f8", ] +TEST_BOOTS_IDS_NATIVE = [ + "77f66b1fd6b2416e8eebf509c9a470e6", + "2411cf84b38a41939de746afc7a22e18", + "3fa78d16a4ee4975b632d24dea52404c", + "b81a432efb2d4a71a1e2d1b42892c187", + "aeaa3d67efe9410ba7210cbee21bb022", +] + async def test_load(coresys: CoreSys): """Test load.""" @@ -82,7 +90,11 @@ async def test_logs_coloured(journald_gateway: MagicMock, coresys: CoreSys): ) -async def test_boot_ids(journald_gateway: MagicMock, coresys: CoreSys): +async def test_boot_ids( + journald_gateway: MagicMock, + coresys: CoreSys, + without_journal_gatewayd_boots: MagicMock, +): """Test getting boot ids.""" journald_gateway.content.feed_data( load_fixture("logs_boot_ids.txt").encode("utf-8") @@ -109,7 +121,11 @@ async def test_boot_ids(journald_gateway: MagicMock, coresys: CoreSys): await coresys.host.logs.get_boot_id(3) -async def test_boot_ids_fallback(journald_gateway: MagicMock, coresys: CoreSys): +async def test_boot_ids_legacy_fallback( + journald_gateway: MagicMock, + coresys: CoreSys, + without_journal_gatewayd_boots: MagicMock, +): """Test getting boot ids using fallback.""" # Initial response has no log lines journald_gateway.content.feed_data(b"") @@ -134,6 +150,16 @@ async def test_boot_ids_fallback(journald_gateway: MagicMock, coresys: CoreSys): ] +async def test_boot_ids_native(journald_gateway: MagicMock, coresys: CoreSys): + """Test getting boot ids from /boots endpoint.""" + journald_gateway.content.feed_data( + load_fixture("systemd_journal_boots.jsons").encode("utf-8") + ) + journald_gateway.content.feed_eof() + + assert await coresys.host.logs.get_boot_ids() == TEST_BOOTS_IDS_NATIVE + + async def test_identifiers(journald_gateway: MagicMock, coresys: CoreSys): """Test getting identifiers.""" journald_gateway.content.feed_data( diff --git a/tests/utils/test_systemd_journal.py b/tests/utils/test_systemd_journal.py index 7d683c027..440105aff 100644 --- a/tests/utils/test_systemd_journal.py +++ b/tests/utils/test_systemd_journal.py @@ -8,6 +8,7 @@ import pytest from supervisor.exceptions import MalformedBinaryEntryError from supervisor.host.const import LogFormatter from supervisor.utils.systemd_journal import ( + journal_boots_reader, journal_logs_reader, journal_plain_formatter, journal_verbose_formatter, @@ -205,3 +206,72 @@ async def test_parsing_colored_supervisor_logs(): line == "\x1b[32m24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor\x1b[0m" ) + + +async def test_parsing_boots(): + """Test parsing of boots.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data( + b'\x1e{"index":0,"boot_id":"e9ba6e5d9bc745c591686a502e3ed817","first_entry":1748251653514247,"last_entry":1748258595644563}\x0a' + b'\x1e{"index":-1,"boot_id":"2087f5f269724a48852c92a2e663fb94","first_entry":1748012078520355,"last_entry":1748023322834353}\x0a' + b'\x1e{"index":-2,"boot_id":"865a4aa6128e4747917047c09f400d0d","first_entry":1748011941404183,"last_entry":1748012025742472}' + ) + stream.feed_eof() + + boots = [] + async for index, boot_id in journal_boots_reader(journal_logs): + boots.append((index, boot_id)) + + assert boots == [ + (0, "e9ba6e5d9bc745c591686a502e3ed817"), + (-1, "2087f5f269724a48852c92a2e663fb94"), + (-2, "865a4aa6128e4747917047c09f400d0d"), + ] + + +async def test_parsing_boots_no_lf(): + """Test parsing of boots without LF separator (only RS).""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data( + b'\x1e{"index":0,"boot_id":"e9ba6e5d9bc745c591686a502e3ed817","first_entry":1748251653514247,"last_entry":1748258595644563}' + b'\x1e{"index":-1,"boot_id":"2087f5f269724a48852c92a2e663fb94","first_entry":1748012078520355,"last_entry":1748023322834353}' + b'\x1e{"index":-2,"boot_id":"865a4aa6128e4747917047c09f400d0d","first_entry":1748011941404183,"last_entry":1748012025742472}' + ) + stream.feed_eof() + + boots = [] + async for index, boot_id in journal_boots_reader(journal_logs): + boots.append((index, boot_id)) + + assert boots == [ + (0, "e9ba6e5d9bc745c591686a502e3ed817"), + (-1, "2087f5f269724a48852c92a2e663fb94"), + (-2, "865a4aa6128e4747917047c09f400d0d"), + ] + + +async def test_parsing_boots_single(): + """Test parsing of single boot with trailing LF.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data( + b'\x1e{"index":0,"boot_id":"e9ba6e5d9bc745c591686a502e3ed817","first_entry":1748251653514247,"last_entry":1748258595644563}\x0a' + ) + stream.feed_eof() + + boots = [] + async for index, boot_id in journal_boots_reader(journal_logs): + boots.append((index, boot_id)) + + assert boots == [(0, "e9ba6e5d9bc745c591686a502e3ed817")] + + +async def test_parsing_boots_none(): + """Test parsing of empty boot response.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_eof() + + boots = [] + async for index, boot_id in journal_boots_reader(journal_logs): + boots.append((index, boot_id)) + + assert boots == []