From e208d8b93edd3fc29d99121b6286b6324cf7de84 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 4 Aug 2020 09:21:45 -1000 Subject: [PATCH] Move system log processing out of the event loop (#38445) --- .../components/system_log/__init__.py | 55 +++++++++--- tests/components/system_log/test_init.py | 88 +++++++++++++++---- 2 files changed, 112 insertions(+), 31 deletions(-) diff --git a/homeassistant/components/system_log/__init__.py b/homeassistant/components/system_log/__init__.py index bf49de5a731..6f658962fe0 100644 --- a/homeassistant/components/system_log/__init__.py +++ b/homeassistant/components/system_log/__init__.py @@ -1,6 +1,8 @@ """Support for system log.""" +import asyncio from collections import OrderedDict, deque import logging +import queue import re import traceback @@ -8,7 +10,8 @@ import voluptuous as vol from homeassistant import __path__ as HOMEASSISTANT_PATH from homeassistant.components.http import HomeAssistantView -from homeassistant.const import EVENT_HOMEASSISTANT_STOP +from homeassistant.const import EVENT_HOMEASSISTANT_CLOSE, EVENT_HOMEASSISTANT_STOP +from homeassistant.core import callback import homeassistant.helpers.config_validation as cv CONF_MAX_ENTRIES = "max_entries" @@ -155,6 +158,19 @@ class DedupStore(OrderedDict): return [value.to_dict() for value in reversed(self.values())] +class LogErrorQueueHandler(logging.handlers.QueueHandler): + """Process the log in another thread.""" + + def emit(self, record): + """Emit a log record.""" + try: + self.enqueue(record) + except asyncio.CancelledError: # pylint: disable=try-except-raise + raise + except Exception: # pylint: disable=broad-except + self.handleError(record) + + class LogErrorHandler(logging.Handler): """Log handler for error messages.""" @@ -172,17 +188,14 @@ class LogErrorHandler(logging.Handler): default upper limit is set to 50 (older entries are discarded) but can be changed if needed. """ - if record.levelno >= logging.WARN: - stack = [] - if not record.exc_info: - stack = [(f[0], f[1]) for f in traceback.extract_stack()] + stack = [] + if not record.exc_info: + stack = [(f[0], f[1]) for f in traceback.extract_stack()] - entry = LogEntry( - record, stack, _figure_out_source(record, stack, self.hass) - ) - self.records.add_entry(entry) - if self.fire_event: - self.hass.bus.fire(EVENT_SYSTEM_LOG, entry.to_dict()) + entry = LogEntry(record, stack, _figure_out_source(record, stack, self.hass)) + self.records.add_entry(entry) + if self.fire_event: + self.hass.bus.fire(EVENT_SYSTEM_LOG, entry.to_dict()) async def async_setup(hass, config): @@ -191,8 +204,26 @@ async def async_setup(hass, config): if conf is None: conf = CONFIG_SCHEMA({DOMAIN: {}})[DOMAIN] + simple_queue = queue.SimpleQueue() + queue_handler = LogErrorQueueHandler(simple_queue) + queue_handler.setLevel(logging.WARN) + logging.root.addHandler(queue_handler) + handler = LogErrorHandler(hass, conf[CONF_MAX_ENTRIES], conf[CONF_FIRE_EVENT]) - logging.getLogger().addHandler(handler) + + listener = logging.handlers.QueueListener( + simple_queue, handler, respect_handler_level=True + ) + + listener.start() + + @callback + def _async_stop_queue_handler(_) -> None: + """Cleanup handler.""" + logging.root.removeHandler(queue_handler) + listener.stop() + + hass.bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, _async_stop_queue_handler) hass.http.register_view(AllErrorsView(handler)) diff --git a/tests/components/system_log/test_init.py b/tests/components/system_log/test_init.py index 009701ca886..d19ca2261bb 100644 --- a/tests/components/system_log/test_init.py +++ b/tests/components/system_log/test_init.py @@ -1,5 +1,9 @@ """Test system log component.""" +import asyncio import logging +import queue + +import pytest from homeassistant.bootstrap import async_setup_component from homeassistant.components import system_log @@ -11,13 +15,34 @@ _LOGGER = logging.getLogger("test_logger") BASIC_CONFIG = {"system_log": {"max_entries": 2}} +@pytest.fixture +def simple_queue(): + """Fixture that get the queue.""" + simple_queue_fixed = queue.SimpleQueue() + with patch( + "homeassistant.components.system_log.queue.SimpleQueue", + return_value=simple_queue_fixed, + ): + yield simple_queue_fixed + + +async def _async_block_until_queue_empty(hass, sq): + # Unfortunately we are stuck with polling + await hass.async_block_till_done() + while not sq.empty(): + await asyncio.sleep(0.01) + await hass.async_block_till_done() + + async def get_error_log(hass, hass_client, expected_count): """Fetch all entries from system_log via the API.""" + client = await hass_client() resp = await client.get("/api/error/all") assert resp.status == 200 data = await resp.json() + assert len(data) == expected_count return data @@ -46,41 +71,49 @@ def get_frame(name): return (name, 5, None, None) -async def test_normal_logs(hass, hass_client): +async def test_normal_logs(hass, simple_queue, hass_client): """Test that debug and info are not logged.""" await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG) + _LOGGER.debug("debug") _LOGGER.info("info") + await _async_block_until_queue_empty(hass, simple_queue) # Assert done by get_error_log await get_error_log(hass, hass_client, 0) -async def test_exception(hass, hass_client): +async def test_exception(hass, simple_queue, hass_client): """Test that exceptions are logged and retrieved correctly.""" await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG) _generate_and_log_exception("exception message", "log message") + await _async_block_until_queue_empty(hass, simple_queue) + log = (await get_error_log(hass, hass_client, 1))[0] assert_log(log, "exception message", "log message", "ERROR") -async def test_warning(hass, hass_client): +async def test_warning(hass, simple_queue, hass_client): """Test that warning are logged and retrieved correctly.""" await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG) _LOGGER.warning("warning message") + await _async_block_until_queue_empty(hass, simple_queue) + log = (await get_error_log(hass, hass_client, 1))[0] assert_log(log, "", "warning message", "WARNING") -async def test_error(hass, hass_client): +async def test_error(hass, simple_queue, hass_client): """Test that errors are logged and retrieved correctly.""" await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG) _LOGGER.error("error message") + await _async_block_until_queue_empty(hass, simple_queue) + log = (await get_error_log(hass, hass_client, 1))[0] assert_log(log, "", "error message", "ERROR") -async def test_config_not_fire_event(hass): +async def test_config_not_fire_event(hass, simple_queue): """Test that errors are not posted as events with default config.""" await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG) events = [] @@ -93,12 +126,12 @@ async def test_config_not_fire_event(hass): hass.bus.async_listen(system_log.EVENT_SYSTEM_LOG, event_listener) _LOGGER.error("error message") - await hass.async_block_till_done() + await _async_block_until_queue_empty(hass, simple_queue) assert len(events) == 0 -async def test_error_posted_as_event(hass): +async def test_error_posted_as_event(hass, simple_queue): """Test that error are posted as events.""" await async_setup_component( hass, system_log.DOMAIN, {"system_log": {"max_entries": 2, "fire_event": True}} @@ -113,26 +146,30 @@ async def test_error_posted_as_event(hass): hass.bus.async_listen(system_log.EVENT_SYSTEM_LOG, event_listener) _LOGGER.error("error message") - await hass.async_block_till_done() + await _async_block_until_queue_empty(hass, simple_queue) assert len(events) == 1 assert_log(events[0].data, "", "error message", "ERROR") -async def test_critical(hass, hass_client): +async def test_critical(hass, simple_queue, hass_client): """Test that critical are logged and retrieved correctly.""" await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG) _LOGGER.critical("critical message") + await _async_block_until_queue_empty(hass, simple_queue) + log = (await get_error_log(hass, hass_client, 1))[0] assert_log(log, "", "critical message", "CRITICAL") -async def test_remove_older_logs(hass, hass_client): +async def test_remove_older_logs(hass, simple_queue, hass_client): """Test that older logs are rotated out.""" await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG) _LOGGER.error("error message 1") _LOGGER.error("error message 2") _LOGGER.error("error message 3") + await _async_block_until_queue_empty(hass, simple_queue) + log = await get_error_log(hass, hass_client, 2) assert_log(log[0], "", "error message 3", "ERROR") assert_log(log[1], "", "error message 2", "ERROR") @@ -143,19 +180,23 @@ def log_msg(nr=2): _LOGGER.error("error message %s", nr) -async def test_dedup_logs(hass, hass_client): +async def test_dedup_logs(hass, simple_queue, hass_client): """Test that duplicate log entries are dedup.""" await async_setup_component(hass, system_log.DOMAIN, {}) _LOGGER.error("error message 1") log_msg() log_msg("2-2") _LOGGER.error("error message 3") + await _async_block_until_queue_empty(hass, simple_queue) + log = await get_error_log(hass, hass_client, 3) assert_log(log[0], "", "error message 3", "ERROR") assert log[1]["count"] == 2 assert_log(log[1], "", ["error message 2", "error message 2-2"], "ERROR") log_msg() + await _async_block_until_queue_empty(hass, simple_queue) + log = await get_error_log(hass, hass_client, 3) assert_log(log[0], "", ["error message 2", "error message 2-2"], "ERROR") assert log[0]["timestamp"] > log[0]["first_occurred"] @@ -164,6 +205,8 @@ async def test_dedup_logs(hass, hass_client): log_msg("2-4") log_msg("2-5") log_msg("2-6") + await _async_block_until_queue_empty(hass, simple_queue) + log = await get_error_log(hass, hass_client, 3) assert_log( log[0], @@ -179,15 +222,16 @@ async def test_dedup_logs(hass, hass_client): ) -async def test_clear_logs(hass, hass_client): +async def test_clear_logs(hass, simple_queue, hass_client): """Test that the log can be cleared via a service call.""" await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG) _LOGGER.error("error message") + await _async_block_until_queue_empty(hass, simple_queue) hass.async_add_job( hass.services.async_call(system_log.DOMAIN, system_log.SERVICE_CLEAR, {}) ) - await hass.async_block_till_done() + await _async_block_until_queue_empty(hass, simple_queue) # Assert done by get_error_log await get_error_log(hass, hass_client, 0) @@ -239,16 +283,17 @@ async def test_write_choose_level(hass): assert logger.method_calls[0] == ("debug", ("test_message",)) -async def test_unknown_path(hass, hass_client): +async def test_unknown_path(hass, simple_queue, hass_client): """Test error logged from unknown path.""" await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG) _LOGGER.findCaller = MagicMock(return_value=("unknown_path", 0, None, None)) _LOGGER.error("error message") + await _async_block_until_queue_empty(hass, simple_queue) log = (await get_error_log(hass, hass_client, 1))[0] assert log["source"] == ["unknown_path", 0] -def log_error_from_test_path(path): +async def async_log_error_from_test_path(hass, path, sq): """Log error while mocking the path.""" call_path = "internal_path.py" with patch.object( @@ -266,24 +311,29 @@ def log_error_from_test_path(path): ), ): _LOGGER.error("error message") + await _async_block_until_queue_empty(hass, sq) -async def test_homeassistant_path(hass, hass_client): +async def test_homeassistant_path(hass, simple_queue, hass_client): """Test error logged from Home Assistant path.""" await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG) with patch( "homeassistant.components.system_log.HOMEASSISTANT_PATH", new=["venv_path/homeassistant"], ): - log_error_from_test_path("venv_path/homeassistant/component/component.py") + await async_log_error_from_test_path( + hass, "venv_path/homeassistant/component/component.py", simple_queue + ) log = (await get_error_log(hass, hass_client, 1))[0] assert log["source"] == ["component/component.py", 5] -async def test_config_path(hass, hass_client): +async def test_config_path(hass, simple_queue, hass_client): """Test error logged from config path.""" await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG) with patch.object(hass.config, "config_dir", new="config"): - log_error_from_test_path("config/custom_component/test.py") + await async_log_error_from_test_path( + hass, "config/custom_component/test.py", simple_queue + ) log = (await get_error_log(hass, hass_client, 1))[0] assert log["source"] == ["custom_component/test.py", 5]