diff --git a/supervisor/__main__.py b/supervisor/__main__.py index c4e5711e7..e43fff583 100644 --- a/supervisor/__main__.py +++ b/supervisor/__main__.py @@ -6,6 +6,7 @@ from pathlib import Path import sys from supervisor import bootstrap +from supervisor.utils.logging import activate_log_queue_handler _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -38,6 +39,8 @@ if __name__ == "__main__": executor = ThreadPoolExecutor(thread_name_prefix="SyncWorker") loop.set_default_executor(executor) + activate_log_queue_handler() + _LOGGER.info("Initializing Supervisor setup") coresys = loop.run_until_complete(bootstrap.initialize_coresys()) loop.set_debug(coresys.config.debug) diff --git a/supervisor/utils/logging.py b/supervisor/utils/logging.py new file mode 100644 index 000000000..346160c23 --- /dev/null +++ b/supervisor/utils/logging.py @@ -0,0 +1,73 @@ +"""Logging utilities.""" +from __future__ import annotations + +import logging +import logging.handlers +import queue +from typing import Any + + +class SupervisorQueueHandler(logging.handlers.QueueHandler): + """Process the log in another thread.""" + + listener: logging.handlers.QueueListener | None = None + + def prepare(self, record: logging.LogRecord) -> logging.LogRecord: + """Prepare a record for queuing. + + This is added as a workaround for https://bugs.python.org/issue46755 + """ + record = super().prepare(record) + record.stack_info = None + return record + + def handle(self, record: logging.LogRecord) -> Any: + """Conditionally emit the specified logging record. + + Depending on which filters have been added to the handler, push the new + records onto the backing Queue. + + The default python logger Handler acquires a lock + in the parent class which we do not need as + SimpleQueue is already thread safe. + + See https://bugs.python.org/issue24645 + """ + return_value = self.filter(record) + if return_value: + self.emit(record) + return return_value + + def close(self) -> None: + """Tidy up any resources used by the handler. + + This adds shutdown of the QueueListener + """ + super().close() + if not self.listener: + return + self.listener.stop() + self.listener = None + + +def activate_log_queue_handler() -> None: + """Migrate the existing log handlers to use the queue. + + This allows us to avoid blocking I/O and formatting messages + in the event loop as log messages are written in another thread. + """ + simple_queue: queue.SimpleQueue[logging.Handler] = queue.SimpleQueue() + queue_handler = SupervisorQueueHandler(simple_queue) + logging.root.addHandler(queue_handler) + + migrated_handlers: list[logging.Handler] = [] + for handler in logging.root.handlers[:]: + if handler is queue_handler: + continue + logging.root.removeHandler(handler) + migrated_handlers.append(handler) + + listener = logging.handlers.QueueListener(simple_queue, *migrated_handlers) + queue_handler.listener = listener + + listener.start() diff --git a/tests/utils/test_logging.py b/tests/utils/test_logging.py new file mode 100644 index 000000000..2a4f1ca99 --- /dev/null +++ b/tests/utils/test_logging.py @@ -0,0 +1,62 @@ +"""Test supervisor logging util methods.""" +import asyncio +import logging +import queue +from unittest.mock import patch + +import pytest + +import supervisor.utils.logging as logging_util + + +async def test_logging_with_queue_handler() -> None: + """Test logging with SupervisorQueueHandler.""" + + simple_queue = queue.SimpleQueue() # type: ignore + handler = logging_util.SupervisorQueueHandler(simple_queue) + + log_record = logging.makeLogRecord({"msg": "Test Log Record"}) + + handler.emit(log_record) + + with pytest.raises(asyncio.CancelledError), patch.object( + handler, "enqueue", side_effect=asyncio.CancelledError + ): + handler.emit(log_record) + + with patch.object(handler, "emit") as emit_mock: + handler.handle(log_record) + emit_mock.assert_called_once() + + with patch.object(handler, "filter") as filter_mock, patch.object( + handler, "emit" + ) as emit_mock: + filter_mock.return_value = False + handler.handle(log_record) + emit_mock.assert_not_called() + + with patch.object(handler, "enqueue", side_effect=OSError), patch.object( + handler, "handleError" + ) as mock_handle_error: + handler.emit(log_record) + mock_handle_error.assert_called_once() + + handler.close() + + assert simple_queue.get_nowait().msg == "Test Log Record" + assert simple_queue.empty() + + +async def test_migrate_log_handler() -> None: + """Test migrating log handlers.""" + + logging_util.activate_log_queue_handler() + + assert len(logging.root.handlers) == 1 + assert isinstance(logging.root.handlers[0], logging_util.SupervisorQueueHandler) + + # Test that the close hook shuts down the queue handler's thread + listener_thread = logging.root.handlers[0].listener._thread + assert listener_thread.is_alive() + logging.root.handlers[0].close() + assert not listener_thread.is_alive()