diff --git a/supervisor/api/const.py b/supervisor/api/const.py index e684a2a0d..80fff328a 100644 --- a/supervisor/api/const.py +++ b/supervisor/api/const.py @@ -8,6 +8,7 @@ CONTENT_TYPE_PNG = "image/png" CONTENT_TYPE_TAR = "application/tar" CONTENT_TYPE_TEXT = "text/plain" CONTENT_TYPE_URL = "application/x-www-form-urlencoded" +CONTENT_TYPE_X_LOG = "text/x-log" COOKIE_INGRESS = "ingress_session" diff --git a/supervisor/api/host.py b/supervisor/api/host.py index 60a39500b..281f1cfa3 100644 --- a/supervisor/api/host.py +++ b/supervisor/api/host.py @@ -28,7 +28,14 @@ from ..const import ( ) from ..coresys import CoreSysAttributes from ..exceptions import APIError, HostLogError -from ..host.const import PARAM_BOOT_ID, PARAM_FOLLOW, PARAM_SYSLOG_IDENTIFIER +from ..host.const import ( + PARAM_BOOT_ID, + PARAM_FOLLOW, + PARAM_SYSLOG_IDENTIFIER, + LogFormat, + LogFormatter, +) +from ..utils.systemd_journal import journal_logs_reader from .const import ( ATTR_AGENT_VERSION, ATTR_APPARMOR_VERSION, @@ -43,6 +50,7 @@ from .const import ( ATTR_STARTUP_TIME, ATTR_USE_NTP, CONTENT_TYPE_TEXT, + CONTENT_TYPE_X_LOG, ) from .utils import api_process, api_validate @@ -158,6 +166,7 @@ class APIHost(CoreSysAttributes): self, request: web.Request, identifier: str | None = None, follow: bool = False ) -> web.StreamResponse: """Return systemd-journald logs.""" + log_formatter = LogFormatter.PLAIN params = {} if identifier: params[PARAM_SYSLOG_IDENTIFIER] = identifier @@ -165,6 +174,8 @@ class APIHost(CoreSysAttributes): params[PARAM_SYSLOG_IDENTIFIER] = request.match_info.get(IDENTIFIER) else: params[PARAM_SYSLOG_IDENTIFIER] = self.sys_host.logs.default_identifiers + # host logs should be always verbose, no matter what Accept header is used + log_formatter = LogFormatter.VERBOSE if BOOTID in request.match_info: params[PARAM_BOOT_ID] = await self._get_boot_id( @@ -175,26 +186,31 @@ class APIHost(CoreSysAttributes): if ACCEPT in request.headers and request.headers[ACCEPT] not in [ CONTENT_TYPE_TEXT, + CONTENT_TYPE_X_LOG, "*/*", ]: raise APIError( - "Invalid content type requested. Only text/plain supported for now." + "Invalid content type requested. Only text/plain and text/x-log " + "supported for now." ) + if request.headers[ACCEPT] == CONTENT_TYPE_X_LOG: + log_formatter = LogFormatter.VERBOSE + if RANGE in request.headers: range_header = request.headers.get(RANGE) else: range_header = f"entries=:-{DEFAULT_RANGE}:" async with self.sys_host.logs.journald_logs( - params=params, range_header=range_header + params=params, range_header=range_header, accept=LogFormat.JOURNAL ) as resp: try: response = web.StreamResponse() response.content_type = CONTENT_TYPE_TEXT await response.prepare(request) - async for data in resp.content: - await response.write(data) + async for line in journal_logs_reader(resp, log_formatter): + await response.write(line.encode("utf-8") + b"\n") except ConnectionResetError as ex: raise APIError( "Connection reset when trying to fetch data from systemd-journald." diff --git a/supervisor/exceptions.py b/supervisor/exceptions.py index d040b7707..9267289d3 100644 --- a/supervisor/exceptions.py +++ b/supervisor/exceptions.py @@ -509,6 +509,17 @@ class WhoamiConnectivityError(WhoamiError): """Connectivity errors while using whoami.""" +# utils/systemd_journal + + +class SystemdJournalError(HassioError): + """Error while processing systemd journal logs.""" + + +class MalformedBinaryEntryError(SystemdJournalError): + """Raised when binary entry in the journal isn't followed by a newline.""" + + # docker/api diff --git a/supervisor/host/const.py b/supervisor/host/const.py index 42931cc12..ea70325ef 100644 --- a/supervisor/host/const.py +++ b/supervisor/host/const.py @@ -62,3 +62,10 @@ class LogFormat(StrEnum): JOURNAL = "application/vnd.fdo.journal" JSON = "application/json" TEXT = "text/plain" + + +class LogFormatter(StrEnum): + """Log formatter.""" + + PLAIN = "plain" + VERBOSE = "verbose" diff --git a/supervisor/host/logs.py b/supervisor/host/logs.py index 27f7cab81..0e0ef5059 100644 --- a/supervisor/host/logs.py +++ b/supervisor/host/logs.py @@ -7,12 +7,18 @@ import logging from pathlib import Path from aiohttp import ClientError, ClientSession, ClientTimeout +from aiohttp.client_exceptions import UnixClientConnectorError from aiohttp.client_reqrep import ClientResponse from aiohttp.connector import UnixConnector from aiohttp.hdrs import ACCEPT, RANGE from ..coresys import CoreSys, CoreSysAttributes -from ..exceptions import ConfigurationFileError, HostLogError, HostNotSupportedError +from ..exceptions import ( + ConfigurationFileError, + HostLogError, + HostNotSupportedError, + HostServiceError, +) from ..utils.json import read_json_file from .const import PARAM_BOOT_ID, PARAM_SYSLOG_IDENTIFIER, LogFormat @@ -138,16 +144,21 @@ class LogsControl(CoreSysAttributes): "No systemd-journal-gatewayd Unix socket available", _LOGGER.error ) - async with ClientSession( - connector=UnixConnector(path="/run/systemd-journal-gatewayd.sock") - ) as session: - headers = {ACCEPT: accept} - if range_header: - headers[RANGE] = range_header - async with session.get( - f"http://localhost{path}", - headers=headers, - params=params or {}, - timeout=timeout, - ) as client_response: - yield client_response + try: + async with ClientSession( + connector=UnixConnector(path=str(SYSTEMD_JOURNAL_GATEWAYD_SOCKET)) + ) as session: + headers = {ACCEPT: accept} + if range_header: + headers[RANGE] = range_header + async with session.get( + f"http://localhost{path}", + headers=headers, + params=params or {}, + timeout=timeout, + ) as client_response: + yield client_response + except UnixClientConnectorError as ex: + raise HostServiceError( + "Unable to connect to systemd-journal-gatewayd", _LOGGER.error + ) from ex diff --git a/supervisor/utils/systemd_journal.py b/supervisor/utils/systemd_journal.py new file mode 100644 index 000000000..1ad71bbb0 --- /dev/null +++ b/supervisor/utils/systemd_journal.py @@ -0,0 +1,113 @@ +"""Utilities for working with systemd journal export format.""" +from collections.abc import AsyncGenerator +from datetime import UTC, datetime +from functools import wraps + +from aiohttp import ClientResponse + +from supervisor.exceptions import MalformedBinaryEntryError +from supervisor.host.const import LogFormatter + + +def formatter(required_fields: list[str]): + """Decorate journal entry formatters with list of required fields. + + Helper decorator that can be used for getting list of required fields for a journal + formatter function using function.required_fields function attribute. + """ + + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + wrapper.required_fields = required_fields + return wrapper + + return decorator + + +@formatter(["MESSAGE"]) +def journal_plain_formatter(entries: dict[str, str]) -> str: + """Format parsed journal entries as a plain message.""" + return entries["MESSAGE"] + + +@formatter( + [ + "__REALTIME_TIMESTAMP", + "_HOSTNAME", + "SYSLOG_IDENTIFIER", + "_PID", + "MESSAGE", + ] +) +def journal_verbose_formatter(entries: dict[str, str]) -> str: + """Format parsed journal entries to a journalctl-like format.""" + ts = datetime.fromtimestamp( + int(entries["__REALTIME_TIMESTAMP"]) / 1e6, UTC + ).isoformat(sep=" ", timespec="milliseconds") + ts = ts[: ts.index(".") + 4] # strip TZ offset + + identifier = ( + f"{entries.get("SYSLOG_IDENTIFIER", "_UNKNOWN_")}[{entries["_PID"]}]" + if "_PID" in entries + else entries.get("SYSLOG_IDENTIFIER", "_UNKNOWN_") + ) + + return f"{ts} {entries.get("_HOSTNAME", "")} {identifier}: {entries.get("MESSAGE", "")}" + + +async def journal_logs_reader( + journal_logs: ClientResponse, + log_formatter: LogFormatter = LogFormatter.PLAIN, +) -> AsyncGenerator[str, None]: + """Read logs from systemd journal line by line, formatted using the given formatter.""" + match log_formatter: + case LogFormatter.PLAIN: + formatter_ = journal_plain_formatter + case LogFormatter.VERBOSE: + formatter_ = journal_verbose_formatter + case _: + raise ValueError(f"Unknown log format: {log_formatter}") + + async with journal_logs as resp: + entries: dict[str, str] = {} + while not resp.content.at_eof(): + line = await resp.content.readuntil(b"\n") + # newline means end of message: + if line == b"\n": + yield formatter_(entries) + entries = {} + continue + + # Journal fields consisting only of valid non-control UTF-8 codepoints + # are serialized as they are (i.e. the field name, followed by '=', + # followed by field data), followed by a newline as separator to the next + # field. Note that fields containing newlines cannot be formatted like + # this. Non-control UTF-8 codepoints are the codepoints with value at or + # above 32 (' '), or equal to 9 (TAB). + name, sep, data = line.partition(b"=") + if not sep: + # Other journal fields are serialized in a special binary safe way: + # field name, followed by newline + name = name[:-1] # strip \n + # followed by a binary 64-bit little endian size value, + length_raw = await resp.content.readexactly(8) + length = int.from_bytes(length_raw, byteorder="little") + # followed by the binary field data, + data = await resp.content.readexactly(length + 1) + # followed by a newline as separator to the next field. + if not data.endswith(b"\n"): + raise MalformedBinaryEntryError( + f"Failed parsing binary entry {data}" + ) + + name = name.decode("utf-8") + if name not in formatter_.required_fields: + # we must read to the end of the entry in the stream, so we can + # only continue the loop here + continue + + # strip \n for simple fields before decoding + entries[name] = data[:-1].decode("utf-8") diff --git a/tests/api/test_host.py b/tests/api/test_host.py index 9b6146e6d..8df7cd91a 100644 --- a/tests/api/test_host.py +++ b/tests/api/test_host.py @@ -1,12 +1,13 @@ """Test Host API.""" -from unittest.mock import MagicMock +from unittest.mock import ANY, MagicMock from aiohttp.test_utils import TestClient import pytest from supervisor.coresys import CoreSys from supervisor.dbus.resolved import Resolved +from supervisor.host.const import LogFormat, LogFormatter DEFAULT_RANGE = "entries=:-100:" # pylint: disable=protected-access @@ -154,6 +155,7 @@ async def test_advanced_logs( journald_logs.assert_called_once_with( params={"SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers}, range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() @@ -161,7 +163,9 @@ async def test_advanced_logs( identifier = "dropbear" await api_client.get(f"/host/logs/identifiers/{identifier}") journald_logs.assert_called_once_with( - params={"SYSLOG_IDENTIFIER": identifier}, range_header=DEFAULT_RANGE + params={"SYSLOG_IDENTIFIER": identifier}, + range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() @@ -174,6 +178,7 @@ async def test_advanced_logs( "SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers, }, range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() @@ -182,6 +187,7 @@ async def test_advanced_logs( journald_logs.assert_called_once_with( params={"_BOOT_ID": bootid, "SYSLOG_IDENTIFIER": identifier}, range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() @@ -191,6 +197,7 @@ async def test_advanced_logs( journald_logs.assert_called_once_with( params={"SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers}, range_header=headers["Range"], + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() @@ -202,6 +209,7 @@ async def test_advanced_logs( "follow": "", }, range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) @@ -216,6 +224,7 @@ async def test_advanced_logs_boot_id_offset( "SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers, }, range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() @@ -227,6 +236,7 @@ async def test_advanced_logs_boot_id_offset( "SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers, }, range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() @@ -238,11 +248,41 @@ async def test_advanced_logs_boot_id_offset( "SYSLOG_IDENTIFIER": coresys.host.logs.default_identifiers, }, range_header=DEFAULT_RANGE, + accept=LogFormat.JOURNAL, ) journald_logs.reset_mock() +async def test_advanced_logs_formatters( + api_client: TestClient, + coresys: CoreSys, + journald_gateway: MagicMock, + journal_logs_reader: MagicMock, +): + """Test advanced logs formatters varying on Accept header.""" + + await api_client.get("/host/logs") + journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE) + + journal_logs_reader.reset_mock() + + headers = {"Accept": "text/x-log"} + await api_client.get("/host/logs", headers=headers) + journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE) + + journal_logs_reader.reset_mock() + + await api_client.get("/host/logs/identifiers/test") + journal_logs_reader.assert_called_once_with(ANY, LogFormatter.PLAIN) + + journal_logs_reader.reset_mock() + + headers = {"Accept": "text/x-log"} + await api_client.get("/host/logs/identifiers/test", headers=headers) + journal_logs_reader.assert_called_once_with(ANY, LogFormatter.VERBOSE) + + async def test_advanced_logs_errors(api_client: TestClient): """Test advanced logging API errors.""" # coresys = coresys_logs_control @@ -257,5 +297,5 @@ async def test_advanced_logs_errors(api_client: TestClient): assert result["result"] == "error" assert ( result["message"] - == "Invalid content type requested. Only text/plain supported for now." + == "Invalid content type requested. Only text/plain and text/x-log supported for now." ) diff --git a/tests/conftest.py b/tests/conftest.py index 6fdd2f74f..90b15dc6f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ """Common test functions.""" +import asyncio from functools import partial from inspect import unwrap import os @@ -47,12 +48,7 @@ from supervisor.store.addon import AddonStore from supervisor.store.repository import Repository from supervisor.utils.dt import utcnow -from .common import ( - load_binary_fixture, - load_fixture, - load_json_fixture, - mock_dbus_services, -) +from .common import load_binary_fixture, load_json_fixture, mock_dbus_services from .const import TEST_ADDON_SLUG from .dbus_service_mocks.base import DBusServiceMock from .dbus_service_mocks.network_connection_settings import ( @@ -408,10 +404,28 @@ async def journald_gateway() -> MagicMock: with patch("supervisor.host.logs.Path.is_socket", return_value=True), patch( "supervisor.host.logs.ClientSession.get" ) as get: - get.return_value.__aenter__.return_value.text = AsyncMock( - return_value=load_fixture("logs_host.txt") + reader = asyncio.StreamReader(loop=asyncio.get_running_loop()) + + async def response_text(): + return (await reader.read()).decode("utf-8") + + client_response = MagicMock( + content=reader, + text=response_text, ) - yield get + + get.return_value.__aenter__.return_value = client_response + get.return_value.__aenter__.return_value.__aenter__.return_value = ( + client_response + ) + yield reader + + +@pytest.fixture +async def journal_logs_reader() -> MagicMock: + """Mock journal_logs_reader in host API.""" + with patch("supervisor.api.host.journal_logs_reader") as reader: + yield reader @pytest.fixture diff --git a/tests/fixtures/logs_export_host.txt b/tests/fixtures/logs_export_host.txt new file mode 100644 index 000000000..f7223445e --- /dev/null +++ b/tests/fixtures/logs_export_host.txt @@ -0,0 +1,36 @@ +__CURSOR=s=83fee99ca0c3466db5fc120d52ca7dd8;i=203f2ce;b=f5a5c442fa6548cf97474d2d57c920b3;m=3191a3c620;t=612ccd299e7af;x=8675b540119d10bb +__REALTIME_TIMESTAMP=1709520776193967 +__MONOTONIC_TIMESTAMP=212896826912 +__SEQNUM=33813198 +__SEQNUM_ID=83fee99ca0c3466db5fc120d52ca7dd8 +_BOOT_ID=f5a5c442fa6548cf97474d2d57c920b3 +PRIORITY=6 +_TRANSPORT=journal +_UID=0 +_GID=0 +_CAP_EFFECTIVE=1ffffffffff +_MACHINE_ID=edb710b8363b4ff48aed6e75a27f85a6 +_HOSTNAME=homeassistant +_RUNTIME_SCOPE=system +SYSLOG_FACILITY=37 +TID=1 +SYSLOG_IDENTIFIER=systemd +_PID=1 +_COMM=systemd +_EXE=/usr/lib/systemd/systemd +_CMDLINE=/sbin/init +_SYSTEMD_CGROUP=/init.scope +_SYSTEMD_UNIT=init.scope +_SYSTEMD_SLICE=-.slice +CODE_FILE=src/core/job.c +JOB_TYPE=start +UNIT=systemd-hostnamed.service +CODE_LINE=768 +CODE_FUNC=job_emit_done_message +MESSAGE=Started Hostname Service. +JOB_RESULT=done +MESSAGE_ID=39f53479d3a045ac8e11786248231fbf +JOB_ID=7205 +INVOCATION_ID=e38ca30edd074417b011192307f10811 +_SOURCE_REALTIME_TIMESTAMP=1709520776193926 + diff --git a/tests/fixtures/logs_export_supervisor.txt b/tests/fixtures/logs_export_supervisor.txt new file mode 100644 index 000000000..c038c9da1 Binary files /dev/null and b/tests/fixtures/logs_export_supervisor.txt differ diff --git a/tests/host/test_logs.py b/tests/host/test_logs.py index de90fa235..312eeb2bd 100644 --- a/tests/host/test_logs.py +++ b/tests/host/test_logs.py @@ -1,12 +1,16 @@ """Test host logs control.""" -from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch +from unittest.mock import MagicMock, PropertyMock, patch +from aiohttp.client_exceptions import UnixClientConnectorError +from aiohttp.client_reqrep import ConnectionKey import pytest from supervisor.coresys import CoreSys -from supervisor.exceptions import HostNotSupportedError +from supervisor.exceptions import HostNotSupportedError, HostServiceError +from supervisor.host.const import LogFormatter from supervisor.host.logs import LogsControl +from supervisor.utils.systemd_journal import journal_logs_reader from tests.common import load_fixture @@ -32,10 +36,16 @@ async def test_logs(coresys: CoreSys, journald_gateway: MagicMock): """Test getting logs and errors.""" assert coresys.host.logs.available is True + journald_gateway.feed_data(load_fixture("logs_export_host.txt").encode("utf-8")) + journald_gateway.feed_eof() + async with coresys.host.logs.journald_logs() as resp: - body = await resp.text() + line = await anext( + journal_logs_reader(resp, log_formatter=LogFormatter.VERBOSE) + ) assert ( - "Oct 11 20:46:22 odroid-dev systemd[1]: Started Hostname Service." in body + line + == "2024-03-04 02:52:56.193 homeassistant systemd[1]: Started Hostname Service." ) with patch.object( @@ -45,11 +55,25 @@ async def test_logs(coresys: CoreSys, journald_gateway: MagicMock): pass +async def test_logs_coloured(coresys: CoreSys, journald_gateway: MagicMock): + """Test ANSI control sequences being preserved in binary messages.""" + journald_gateway.feed_data( + load_fixture("logs_export_supervisor.txt").encode("utf-8") + ) + journald_gateway.feed_eof() + + async with coresys.host.logs.journald_logs() as resp: + line = await anext(journal_logs_reader(resp)) + assert ( + line + == "\x1b[32m24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor\x1b[0m" + ) + + async def test_boot_ids(coresys: CoreSys, journald_gateway: MagicMock): """Test getting boot ids.""" - journald_gateway.return_value.__aenter__.return_value.text = AsyncMock( - return_value=load_fixture("logs_boot_ids.txt") - ) + journald_gateway.feed_data(load_fixture("logs_boot_ids.txt").encode("utf-8")) + journald_gateway.feed_eof() assert await coresys.host.logs.get_boot_ids() == TEST_BOOT_IDS @@ -73,9 +97,8 @@ async def test_boot_ids(coresys: CoreSys, journald_gateway: MagicMock): async def test_identifiers(coresys: CoreSys, journald_gateway: MagicMock): """Test getting identifiers.""" - journald_gateway.return_value.__aenter__.return_value.text = AsyncMock( - return_value=load_fixture("logs_identifiers.txt") - ) + journald_gateway.feed_data(load_fixture("logs_identifiers.txt").encode("utf-8")) + journald_gateway.feed_eof() # Mock is large so just look for a few different types of identifiers identifiers = await coresys.host.logs.get_identifiers() @@ -89,3 +112,21 @@ async def test_identifiers(coresys: CoreSys, journald_gateway: MagicMock): assert identifier in identifiers assert "" not in identifiers + + +async def test_connection_refused_handled( + coresys: CoreSys, journald_gateway: MagicMock +): + """Test connection refused is handled with HostServiceError.""" + with patch("supervisor.host.logs.ClientSession.get") as get: + get.side_effect = UnixClientConnectorError( + path="/run/systemd-journal-gatewayd.sock", + connection_key=ConnectionKey( + "localhost", None, False, False, None, None, None + ), + os_error=ConnectionRefusedError("Connection refused"), + ) + + with pytest.raises(HostServiceError): + async with coresys.host.logs.journald_logs(): + pass diff --git a/tests/utils/test_systemd_journal.py b/tests/utils/test_systemd_journal.py new file mode 100644 index 000000000..5c9f23151 --- /dev/null +++ b/tests/utils/test_systemd_journal.py @@ -0,0 +1,188 @@ +"""Test systemd journal utilities.""" +import asyncio +from unittest.mock import MagicMock + +import pytest + +from supervisor.exceptions import MalformedBinaryEntryError +from supervisor.host.const import LogFormatter +from supervisor.utils.systemd_journal import ( + journal_logs_reader, + journal_plain_formatter, + journal_verbose_formatter, +) + +from tests.common import load_fixture + + +def _journal_logs_mock(): + """Generate mocked stream for journal_logs_reader. + + Returns tuple for mocking ClientResponse and its StreamReader + (.content attribute in async context). + """ + stream = asyncio.StreamReader(loop=asyncio.get_running_loop()) + journal_logs = MagicMock() + journal_logs.__aenter__.return_value.content = stream + return journal_logs, stream + + +def test_format_simple(): + """Test plain formatter.""" + fields = {"MESSAGE": "Hello, world!"} + assert journal_plain_formatter(fields) == "Hello, world!" + + +def test_format_simple_newlines(): + """Test plain formatter with newlines in message.""" + fields = {"MESSAGE": "Hello,\nworld!\n"} + assert journal_plain_formatter(fields) == "Hello,\nworld!\n" + + +def test_format_verbose_timestamp(): + """Test timestamp is properly formatted.""" + fields = { + "__REALTIME_TIMESTAMP": "1000", + "_HOSTNAME": "x", + "SYSLOG_IDENTIFIER": "x", + "_PID": "1", + "MESSAGE": "x", + } + formatted = journal_verbose_formatter(fields) + assert formatted.startswith( + "1970-01-01 00:00:00.001 " + ), f"Invalid log timestamp: {formatted}" + + +def test_format_verbose(): + """Test verbose formatter.""" + fields = { + "__REALTIME_TIMESTAMP": "1379403171000000", + "_HOSTNAME": "homeassistant", + "SYSLOG_IDENTIFIER": "python", + "_PID": "666", + "MESSAGE": "Hello, world!", + } + assert ( + journal_verbose_formatter(fields) + == "2013-09-17 07:32:51.000 homeassistant python[666]: Hello, world!" + ) + + +def test_format_verbose_newlines(): + """Test verbose formatter with newlines in message.""" + fields = { + "__REALTIME_TIMESTAMP": "1379403171000000", + "_HOSTNAME": "homeassistant", + "SYSLOG_IDENTIFIER": "python", + "_PID": "666", + "MESSAGE": "Hello,\nworld!\n", + } + assert ( + journal_verbose_formatter(fields) + == "2013-09-17 07:32:51.000 homeassistant python[666]: Hello,\nworld!\n" + ) + + +async def test_parsing_simple(): + """Test plain formatter.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data(b"MESSAGE=Hello, world!\n\n") + line = await anext(journal_logs_reader(journal_logs)) + assert line == "Hello, world!" + + +async def test_parsing_verbose(): + """Test verbose formatter.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data( + b"__REALTIME_TIMESTAMP=1379403171000000\n" + b"_HOSTNAME=homeassistant\n" + b"SYSLOG_IDENTIFIER=python\n" + b"_PID=666\n" + b"MESSAGE=Hello, world!\n\n" + ) + line = await anext( + journal_logs_reader(journal_logs, log_formatter=LogFormatter.VERBOSE) + ) + assert line == "2013-09-17 07:32:51.000 homeassistant python[666]: Hello, world!" + + +async def test_parsing_newlines_in_message(): + """Test reading and formatting using journal logs reader.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data( + b"ID=1\n" + b"MESSAGE\n\x0d\x00\x00\x00\x00\x00\x00\x00Hello,\nworld!\n" + b"AFTER=after\n\n" + ) + + line = await anext(journal_logs_reader(journal_logs)) + assert line == "Hello,\nworld!" + + +async def test_parsing_newlines_in_multiple_fields(): + """Test entries are correctly separated with newlines in multiple fields.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data( + b"ID=1\n" + b"MESSAGE\n\x0e\x00\x00\x00\x00\x00\x00\x00Hello,\nworld!\n\n" + b"ANOTHER\n\x0e\x00\x00\x00\x00\x00\x00\x00Hello,\nworld!\n\n" + b"AFTER=after\n\n" + b"ID=2\n" + b"MESSAGE\n\x0d\x00\x00\x00\x00\x00\x00\x00Hello,\nworld!\n" + b"AFTER=after\n\n" + ) + + assert await anext(journal_logs_reader(journal_logs)) == "Hello,\nworld!\n" + assert await anext(journal_logs_reader(journal_logs)) == "Hello,\nworld!" + + +async def test_parsing_two_messages(): + """Test reading multiple messages.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data( + b"MESSAGE=Hello, world!\n" + b"ID=1\n\n" + b"MESSAGE=Hello again, world!\n" + b"ID=2\n\n" + ) + stream.feed_eof() + + reader = journal_logs_reader(journal_logs) + assert await anext(reader) == "Hello, world!" + assert await anext(reader) == "Hello again, world!" + with pytest.raises(StopAsyncIteration): + await anext(reader) + + +async def test_parsing_malformed_binary_message(): + """Test that malformed binary message raises MalformedBinaryEntryError.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data( + b"ID=1\n" + b"MESSAGE\n\x0d\x00\x00\x00\x00\x00\x00\x00Hello, world!" + b"AFTER=after\n\n" + ) + + with pytest.raises(MalformedBinaryEntryError): + await anext(journal_logs_reader(journal_logs)) + + +async def test_parsing_journal_host_logs(): + """Test parsing of real host logs.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data(load_fixture("logs_export_host.txt").encode("utf-8")) + line = await anext(journal_logs_reader(journal_logs)) + assert line == "Started Hostname Service." + + +async def test_parsing_colored_supervisor_logs(): + """Test parsing of real logs with ANSI escape sequences.""" + journal_logs, stream = _journal_logs_mock() + stream.feed_data(load_fixture("logs_export_supervisor.txt").encode("utf-8")) + line = await anext(journal_logs_reader(journal_logs)) + assert ( + line + == "\x1b[32m24-03-04 23:56:56 INFO (MainThread) [__main__] Closing Supervisor\x1b[0m" + )