Move system log processing out of the event loop (#38445)

This commit is contained in:
J. Nick Koston 2020-08-04 09:21:45 -10:00 committed by GitHub
parent 31dbdff3c4
commit e208d8b93e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 112 additions and 31 deletions

View File

@ -1,6 +1,8 @@
"""Support for system log."""
import asyncio
from collections import OrderedDict, deque
import logging
import queue
import re
import traceback
@ -8,7 +10,8 @@ import voluptuous as vol
from homeassistant import __path__ as HOMEASSISTANT_PATH
from homeassistant.components.http import HomeAssistantView
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.const import EVENT_HOMEASSISTANT_CLOSE, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv
CONF_MAX_ENTRIES = "max_entries"
@ -155,6 +158,19 @@ class DedupStore(OrderedDict):
return [value.to_dict() for value in reversed(self.values())]
class LogErrorQueueHandler(logging.handlers.QueueHandler):
"""Process the log in another thread."""
def emit(self, record):
"""Emit a log record."""
try:
self.enqueue(record)
except asyncio.CancelledError: # pylint: disable=try-except-raise
raise
except Exception: # pylint: disable=broad-except
self.handleError(record)
class LogErrorHandler(logging.Handler):
"""Log handler for error messages."""
@ -172,17 +188,14 @@ class LogErrorHandler(logging.Handler):
default upper limit is set to 50 (older entries are discarded) but can
be changed if needed.
"""
if record.levelno >= logging.WARN:
stack = []
if not record.exc_info:
stack = [(f[0], f[1]) for f in traceback.extract_stack()]
stack = []
if not record.exc_info:
stack = [(f[0], f[1]) for f in traceback.extract_stack()]
entry = LogEntry(
record, stack, _figure_out_source(record, stack, self.hass)
)
self.records.add_entry(entry)
if self.fire_event:
self.hass.bus.fire(EVENT_SYSTEM_LOG, entry.to_dict())
entry = LogEntry(record, stack, _figure_out_source(record, stack, self.hass))
self.records.add_entry(entry)
if self.fire_event:
self.hass.bus.fire(EVENT_SYSTEM_LOG, entry.to_dict())
async def async_setup(hass, config):
@ -191,8 +204,26 @@ async def async_setup(hass, config):
if conf is None:
conf = CONFIG_SCHEMA({DOMAIN: {}})[DOMAIN]
simple_queue = queue.SimpleQueue()
queue_handler = LogErrorQueueHandler(simple_queue)
queue_handler.setLevel(logging.WARN)
logging.root.addHandler(queue_handler)
handler = LogErrorHandler(hass, conf[CONF_MAX_ENTRIES], conf[CONF_FIRE_EVENT])
logging.getLogger().addHandler(handler)
listener = logging.handlers.QueueListener(
simple_queue, handler, respect_handler_level=True
)
listener.start()
@callback
def _async_stop_queue_handler(_) -> None:
"""Cleanup handler."""
logging.root.removeHandler(queue_handler)
listener.stop()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, _async_stop_queue_handler)
hass.http.register_view(AllErrorsView(handler))

View File

@ -1,5 +1,9 @@
"""Test system log component."""
import asyncio
import logging
import queue
import pytest
from homeassistant.bootstrap import async_setup_component
from homeassistant.components import system_log
@ -11,13 +15,34 @@ _LOGGER = logging.getLogger("test_logger")
BASIC_CONFIG = {"system_log": {"max_entries": 2}}
@pytest.fixture
def simple_queue():
"""Fixture that get the queue."""
simple_queue_fixed = queue.SimpleQueue()
with patch(
"homeassistant.components.system_log.queue.SimpleQueue",
return_value=simple_queue_fixed,
):
yield simple_queue_fixed
async def _async_block_until_queue_empty(hass, sq):
# Unfortunately we are stuck with polling
await hass.async_block_till_done()
while not sq.empty():
await asyncio.sleep(0.01)
await hass.async_block_till_done()
async def get_error_log(hass, hass_client, expected_count):
"""Fetch all entries from system_log via the API."""
client = await hass_client()
resp = await client.get("/api/error/all")
assert resp.status == 200
data = await resp.json()
assert len(data) == expected_count
return data
@ -46,41 +71,49 @@ def get_frame(name):
return (name, 5, None, None)
async def test_normal_logs(hass, hass_client):
async def test_normal_logs(hass, simple_queue, hass_client):
"""Test that debug and info are not logged."""
await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG)
_LOGGER.debug("debug")
_LOGGER.info("info")
await _async_block_until_queue_empty(hass, simple_queue)
# Assert done by get_error_log
await get_error_log(hass, hass_client, 0)
async def test_exception(hass, hass_client):
async def test_exception(hass, simple_queue, hass_client):
"""Test that exceptions are logged and retrieved correctly."""
await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG)
_generate_and_log_exception("exception message", "log message")
await _async_block_until_queue_empty(hass, simple_queue)
log = (await get_error_log(hass, hass_client, 1))[0]
assert_log(log, "exception message", "log message", "ERROR")
async def test_warning(hass, hass_client):
async def test_warning(hass, simple_queue, hass_client):
"""Test that warning are logged and retrieved correctly."""
await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG)
_LOGGER.warning("warning message")
await _async_block_until_queue_empty(hass, simple_queue)
log = (await get_error_log(hass, hass_client, 1))[0]
assert_log(log, "", "warning message", "WARNING")
async def test_error(hass, hass_client):
async def test_error(hass, simple_queue, hass_client):
"""Test that errors are logged and retrieved correctly."""
await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG)
_LOGGER.error("error message")
await _async_block_until_queue_empty(hass, simple_queue)
log = (await get_error_log(hass, hass_client, 1))[0]
assert_log(log, "", "error message", "ERROR")
async def test_config_not_fire_event(hass):
async def test_config_not_fire_event(hass, simple_queue):
"""Test that errors are not posted as events with default config."""
await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG)
events = []
@ -93,12 +126,12 @@ async def test_config_not_fire_event(hass):
hass.bus.async_listen(system_log.EVENT_SYSTEM_LOG, event_listener)
_LOGGER.error("error message")
await hass.async_block_till_done()
await _async_block_until_queue_empty(hass, simple_queue)
assert len(events) == 0
async def test_error_posted_as_event(hass):
async def test_error_posted_as_event(hass, simple_queue):
"""Test that error are posted as events."""
await async_setup_component(
hass, system_log.DOMAIN, {"system_log": {"max_entries": 2, "fire_event": True}}
@ -113,26 +146,30 @@ async def test_error_posted_as_event(hass):
hass.bus.async_listen(system_log.EVENT_SYSTEM_LOG, event_listener)
_LOGGER.error("error message")
await hass.async_block_till_done()
await _async_block_until_queue_empty(hass, simple_queue)
assert len(events) == 1
assert_log(events[0].data, "", "error message", "ERROR")
async def test_critical(hass, hass_client):
async def test_critical(hass, simple_queue, hass_client):
"""Test that critical are logged and retrieved correctly."""
await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG)
_LOGGER.critical("critical message")
await _async_block_until_queue_empty(hass, simple_queue)
log = (await get_error_log(hass, hass_client, 1))[0]
assert_log(log, "", "critical message", "CRITICAL")
async def test_remove_older_logs(hass, hass_client):
async def test_remove_older_logs(hass, simple_queue, hass_client):
"""Test that older logs are rotated out."""
await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG)
_LOGGER.error("error message 1")
_LOGGER.error("error message 2")
_LOGGER.error("error message 3")
await _async_block_until_queue_empty(hass, simple_queue)
log = await get_error_log(hass, hass_client, 2)
assert_log(log[0], "", "error message 3", "ERROR")
assert_log(log[1], "", "error message 2", "ERROR")
@ -143,19 +180,23 @@ def log_msg(nr=2):
_LOGGER.error("error message %s", nr)
async def test_dedup_logs(hass, hass_client):
async def test_dedup_logs(hass, simple_queue, hass_client):
"""Test that duplicate log entries are dedup."""
await async_setup_component(hass, system_log.DOMAIN, {})
_LOGGER.error("error message 1")
log_msg()
log_msg("2-2")
_LOGGER.error("error message 3")
await _async_block_until_queue_empty(hass, simple_queue)
log = await get_error_log(hass, hass_client, 3)
assert_log(log[0], "", "error message 3", "ERROR")
assert log[1]["count"] == 2
assert_log(log[1], "", ["error message 2", "error message 2-2"], "ERROR")
log_msg()
await _async_block_until_queue_empty(hass, simple_queue)
log = await get_error_log(hass, hass_client, 3)
assert_log(log[0], "", ["error message 2", "error message 2-2"], "ERROR")
assert log[0]["timestamp"] > log[0]["first_occurred"]
@ -164,6 +205,8 @@ async def test_dedup_logs(hass, hass_client):
log_msg("2-4")
log_msg("2-5")
log_msg("2-6")
await _async_block_until_queue_empty(hass, simple_queue)
log = await get_error_log(hass, hass_client, 3)
assert_log(
log[0],
@ -179,15 +222,16 @@ async def test_dedup_logs(hass, hass_client):
)
async def test_clear_logs(hass, hass_client):
async def test_clear_logs(hass, simple_queue, hass_client):
"""Test that the log can be cleared via a service call."""
await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG)
_LOGGER.error("error message")
await _async_block_until_queue_empty(hass, simple_queue)
hass.async_add_job(
hass.services.async_call(system_log.DOMAIN, system_log.SERVICE_CLEAR, {})
)
await hass.async_block_till_done()
await _async_block_until_queue_empty(hass, simple_queue)
# Assert done by get_error_log
await get_error_log(hass, hass_client, 0)
@ -239,16 +283,17 @@ async def test_write_choose_level(hass):
assert logger.method_calls[0] == ("debug", ("test_message",))
async def test_unknown_path(hass, hass_client):
async def test_unknown_path(hass, simple_queue, hass_client):
"""Test error logged from unknown path."""
await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG)
_LOGGER.findCaller = MagicMock(return_value=("unknown_path", 0, None, None))
_LOGGER.error("error message")
await _async_block_until_queue_empty(hass, simple_queue)
log = (await get_error_log(hass, hass_client, 1))[0]
assert log["source"] == ["unknown_path", 0]
def log_error_from_test_path(path):
async def async_log_error_from_test_path(hass, path, sq):
"""Log error while mocking the path."""
call_path = "internal_path.py"
with patch.object(
@ -266,24 +311,29 @@ def log_error_from_test_path(path):
),
):
_LOGGER.error("error message")
await _async_block_until_queue_empty(hass, sq)
async def test_homeassistant_path(hass, hass_client):
async def test_homeassistant_path(hass, simple_queue, hass_client):
"""Test error logged from Home Assistant path."""
await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG)
with patch(
"homeassistant.components.system_log.HOMEASSISTANT_PATH",
new=["venv_path/homeassistant"],
):
log_error_from_test_path("venv_path/homeassistant/component/component.py")
await async_log_error_from_test_path(
hass, "venv_path/homeassistant/component/component.py", simple_queue
)
log = (await get_error_log(hass, hass_client, 1))[0]
assert log["source"] == ["component/component.py", 5]
async def test_config_path(hass, hass_client):
async def test_config_path(hass, simple_queue, hass_client):
"""Test error logged from config path."""
await async_setup_component(hass, system_log.DOMAIN, BASIC_CONFIG)
with patch.object(hass.config, "config_dir", new="config"):
log_error_from_test_path("config/custom_component/test.py")
await async_log_error_from_test_path(
hass, "config/custom_component/test.py", simple_queue
)
log = (await get_error_log(hass, hass_client, 1))[0]
assert log["source"] == ["custom_component/test.py", 5]