Use journal-gatewayd's new /boots endpoint to list boots (#5914)

* Use journal-gatewayd's new /boots endpoint to list boots

Current method we use for getting boots has several known downsides, for
example it can miss some incomplete boots and the performance might be
worse than what we could get by using Systemd directly. Systemd was
missing a method to get list boots through the journal-gatewayd but that
should be addressed by the new /boots endpoint added in [1] which
returns application/json-seq response containing all boots as reported
in `journalctl --list-boots`.

Implement Supervisor methods to parse this format and use the endpoint
at first, falling back to the old method if it fails.

[1] https://github.com/systemd/systemd/pull/37574

* Log info instead of warning when /boots is not present

Co-authored-by: Stefan Agner <stefan@agner.ch>

* Split records only by RS instead of LF in journal_boots_reader

* Strip only RS, json.loads is fine with whitespace

---------

Co-authored-by: Stefan Agner <stefan@agner.ch>
This commit is contained in:
Jan Čermák 2025-05-29 11:41:23 +02:00 committed by GitHub
parent 705e76abe3
commit 4d1a5e2dc2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 201 additions and 14 deletions

View File

@ -80,6 +80,7 @@ class LogFormat(StrEnum):
JOURNAL = "application/vnd.fdo.journal"
JSON = "application/json"
JSON_SEQ = "application/json-seq"
TEXT = "text/plain"

View File

@ -25,6 +25,7 @@ from ..exceptions import (
HostServiceError,
)
from ..utils.json import read_json_file
from ..utils.systemd_journal import journal_boots_reader
from .const import PARAM_BOOT_ID, PARAM_SYSLOG_IDENTIFIER, LogFormat
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -108,12 +109,8 @@ class LogsControl(CoreSysAttributes):
return boot_ids[offset]
async def get_boot_ids(self) -> list[str]:
"""Get boot IDs from oldest to newest."""
if self._boot_ids:
# Doesn't change without a reboot, no reason to query again once cached
return self._boot_ids
async def _get_boot_ids_legacy(self) -> list[str]:
"""Get boots IDs using suboptimal method where /boots is not available."""
try:
async with self.journald_logs(
params=BOOT_IDS_QUERY,
@ -142,13 +139,51 @@ class LogsControl(CoreSysAttributes):
_LOGGER.error,
) from err
self._boot_ids = []
_boot_ids = []
for entry in text.split("\n"):
if (
entry
and (boot_id := json.loads(entry)[PARAM_BOOT_ID]) not in self._boot_ids
):
self._boot_ids.append(boot_id)
if entry and (boot_id := json.loads(entry)[PARAM_BOOT_ID]) not in _boot_ids:
_boot_ids.append(boot_id)
return _boot_ids
async def _get_boot_ids_native(self):
"""Get boot IDs using /boots endpoint."""
try:
async with self.journald_logs(
path="/boots",
accept=LogFormat.JSON_SEQ,
timeout=ClientTimeout(total=20),
) as resp:
if resp.status != 200:
raise HostLogError(
f"Got HTTP {resp.status} from /boots.",
_LOGGER.debug,
)
# Don't rely solely on the order of boots in the response,
# sort the boots by index returned in the response.
boot_id_tuples = [boot async for boot in journal_boots_reader(resp)]
return [
boot_id for _, boot_id in sorted(boot_id_tuples, key=lambda x: x[0])
]
except (ClientError, TimeoutError) as err:
raise HostLogError(
"Could not get a list of boot IDs from systemd-journal-gatewayd",
_LOGGER.error,
) from err
async def get_boot_ids(self) -> list[str]:
"""Get boot IDs from oldest to newest."""
if self._boot_ids:
# Doesn't change without a reboot, no reason to query again once cached
return self._boot_ids
try:
self._boot_ids = await self._get_boot_ids_native()
except HostLogError:
_LOGGER.info(
"Could not get /boots from systemd-journal-gatewayd, using fallback."
)
self._boot_ids = await self._get_boot_ids_legacy()
return self._boot_ids

View File

@ -1,8 +1,10 @@
"""Utilities for working with systemd journal export format."""
from asyncio import IncompleteReadError
from collections.abc import AsyncGenerator
from datetime import UTC, datetime
from functools import wraps
import json
from aiohttp import ClientResponse
@ -116,3 +118,35 @@ async def journal_logs_reader(
# strip \n for simple fields before decoding
entries[name] = data[:-1].decode("utf-8")
def _parse_boot_json(boot_json_bytes: bytes) -> tuple[int, str]:
boot_dict = json.loads(boot_json_bytes.decode("utf-8"))
return (
int(boot_dict["index"]),
boot_dict["boot_id"],
)
async def journal_boots_reader(
response: ClientResponse,
) -> AsyncGenerator[tuple[int, str]]:
"""Read boots from json-seq response from systemd journal gateway.
Returns generator of (index, boot_id) tuples.
"""
async with response as resp:
async def read_record() -> bytes:
try:
return await resp.content.readuntil(b"\x1e")
except IncompleteReadError as e:
return e.partial
while line := await read_record():
line = line.strip(b"\x1e")
if not line:
continue
yield _parse_boot_json(line)
return

View File

@ -47,6 +47,7 @@ from supervisor.coresys import CoreSys
from supervisor.dbus.network import NetworkManager
from supervisor.docker.manager import DockerAPI
from supervisor.docker.monitor import DockerMonitor
from supervisor.exceptions import HostLogError
from supervisor.homeassistant.api import APIState
from supervisor.host.logs import LogsControl
from supervisor.os.manager import OSManager
@ -463,6 +464,7 @@ async def journald_gateway() -> AsyncGenerator[MagicMock]:
return (await client_response.content.read()).decode("utf-8")
client_response.text = response_text
client_response.status = 200
get.return_value.__aenter__.return_value = client_response
get.return_value.__aenter__.return_value.__aenter__.return_value = (
@ -471,6 +473,20 @@ async def journald_gateway() -> AsyncGenerator[MagicMock]:
yield client_response
@pytest.fixture
async def without_journal_gatewayd_boots() -> AsyncGenerator[MagicMock]:
"""Make method using /boots of systemd-journald-gateway fail."""
def raise_host_log_error_side_effect(*args, **kwargs):
raise HostLogError("Mocked error")
with patch(
"supervisor.host.logs.LogsControl._get_boot_ids_native"
) as get_boot_ids_native:
get_boot_ids_native.side_effect = raise_host_log_error_side_effect
yield get_boot_ids_native
@pytest.fixture
async def journal_logs_reader() -> MagicMock:
"""Mock journal_logs_reader in host API."""

View File

@ -0,0 +1,5 @@
{"index":0,"boot_id":"aeaa3d67efe9410ba7210cbee21bb022","first_entry":1748034681901673,"last_entry":1748266575896080}
{"index":-1,"boot_id":"b81a432efb2d4a71a1e2d1b42892c187","first_entry":1748029923668450,"last_entry":1748034525932010}
{"index":-2,"boot_id":"3fa78d16a4ee4975b632d24dea52404c","first_entry":1748029753658977,"last_entry":1748029822011659}
{"index":-3,"boot_id":"2411cf84b38a41939de746afc7a22e18","first_entry":1748029538004767,"last_entry":1748029695950467}
{"index":-4,"boot_id":"77f66b1fd6b2416e8eebf509c9a470e6","first_entry":1748029186112301,"last_entry":1748029453598350}

View File

@ -20,6 +20,14 @@ TEST_BOOT_IDS = [
"b1c386a144fd44db8f855d7e907256f8",
]
TEST_BOOTS_IDS_NATIVE = [
"77f66b1fd6b2416e8eebf509c9a470e6",
"2411cf84b38a41939de746afc7a22e18",
"3fa78d16a4ee4975b632d24dea52404c",
"b81a432efb2d4a71a1e2d1b42892c187",
"aeaa3d67efe9410ba7210cbee21bb022",
]
async def test_load(coresys: CoreSys):
"""Test load."""
@ -82,7 +90,11 @@ async def test_logs_coloured(journald_gateway: MagicMock, coresys: CoreSys):
)
async def test_boot_ids(journald_gateway: MagicMock, coresys: CoreSys):
async def test_boot_ids(
journald_gateway: MagicMock,
coresys: CoreSys,
without_journal_gatewayd_boots: MagicMock,
):
"""Test getting boot ids."""
journald_gateway.content.feed_data(
load_fixture("logs_boot_ids.txt").encode("utf-8")
@ -109,7 +121,11 @@ async def test_boot_ids(journald_gateway: MagicMock, coresys: CoreSys):
await coresys.host.logs.get_boot_id(3)
async def test_boot_ids_fallback(journald_gateway: MagicMock, coresys: CoreSys):
async def test_boot_ids_legacy_fallback(
journald_gateway: MagicMock,
coresys: CoreSys,
without_journal_gatewayd_boots: MagicMock,
):
"""Test getting boot ids using fallback."""
# Initial response has no log lines
journald_gateway.content.feed_data(b"")
@ -134,6 +150,16 @@ async def test_boot_ids_fallback(journald_gateway: MagicMock, coresys: CoreSys):
]
async def test_boot_ids_native(journald_gateway: MagicMock, coresys: CoreSys):
"""Test getting boot ids from /boots endpoint."""
journald_gateway.content.feed_data(
load_fixture("systemd_journal_boots.jsons").encode("utf-8")
)
journald_gateway.content.feed_eof()
assert await coresys.host.logs.get_boot_ids() == TEST_BOOTS_IDS_NATIVE
async def test_identifiers(journald_gateway: MagicMock, coresys: CoreSys):
"""Test getting identifiers."""
journald_gateway.content.feed_data(

View File

@ -8,6 +8,7 @@ import pytest
from supervisor.exceptions import MalformedBinaryEntryError
from supervisor.host.const import LogFormatter
from supervisor.utils.systemd_journal import (
journal_boots_reader,
journal_logs_reader,
journal_plain_formatter,
journal_verbose_formatter,
@ -205,3 +206,72 @@ async def test_parsing_colored_supervisor_logs():
line
== "\x1b[32m24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor\x1b[0m"
)
async def test_parsing_boots():
"""Test parsing of boots."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(
b'\x1e{"index":0,"boot_id":"e9ba6e5d9bc745c591686a502e3ed817","first_entry":1748251653514247,"last_entry":1748258595644563}\x0a'
b'\x1e{"index":-1,"boot_id":"2087f5f269724a48852c92a2e663fb94","first_entry":1748012078520355,"last_entry":1748023322834353}\x0a'
b'\x1e{"index":-2,"boot_id":"865a4aa6128e4747917047c09f400d0d","first_entry":1748011941404183,"last_entry":1748012025742472}'
)
stream.feed_eof()
boots = []
async for index, boot_id in journal_boots_reader(journal_logs):
boots.append((index, boot_id))
assert boots == [
(0, "e9ba6e5d9bc745c591686a502e3ed817"),
(-1, "2087f5f269724a48852c92a2e663fb94"),
(-2, "865a4aa6128e4747917047c09f400d0d"),
]
async def test_parsing_boots_no_lf():
"""Test parsing of boots without LF separator (only RS)."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(
b'\x1e{"index":0,"boot_id":"e9ba6e5d9bc745c591686a502e3ed817","first_entry":1748251653514247,"last_entry":1748258595644563}'
b'\x1e{"index":-1,"boot_id":"2087f5f269724a48852c92a2e663fb94","first_entry":1748012078520355,"last_entry":1748023322834353}'
b'\x1e{"index":-2,"boot_id":"865a4aa6128e4747917047c09f400d0d","first_entry":1748011941404183,"last_entry":1748012025742472}'
)
stream.feed_eof()
boots = []
async for index, boot_id in journal_boots_reader(journal_logs):
boots.append((index, boot_id))
assert boots == [
(0, "e9ba6e5d9bc745c591686a502e3ed817"),
(-1, "2087f5f269724a48852c92a2e663fb94"),
(-2, "865a4aa6128e4747917047c09f400d0d"),
]
async def test_parsing_boots_single():
"""Test parsing of single boot with trailing LF."""
journal_logs, stream = _journal_logs_mock()
stream.feed_data(
b'\x1e{"index":0,"boot_id":"e9ba6e5d9bc745c591686a502e3ed817","first_entry":1748251653514247,"last_entry":1748258595644563}\x0a'
)
stream.feed_eof()
boots = []
async for index, boot_id in journal_boots_reader(journal_logs):
boots.append((index, boot_id))
assert boots == [(0, "e9ba6e5d9bc745c591686a502e3ed817")]
async def test_parsing_boots_none():
"""Test parsing of empty boot response."""
journal_logs, stream = _journal_logs_mock()
stream.feed_eof()
boots = []
async for index, boot_id in journal_boots_reader(journal_logs):
boots.append((index, boot_id))
assert boots == []