mirror of
https://github.com/home-assistant/core.git
synced 2025-11-11 12:00:52 +00:00
Compare commits
5 Commits
copilot/ad
...
profiler-a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
631042237b | ||
|
|
e70a3042c2 | ||
|
|
c4eef15be7 | ||
|
|
aaab467b29 | ||
|
|
2d9b6e9c14 |
@@ -1,7 +1,7 @@
|
||||
"""The profiler integration."""
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Generator
|
||||
from collections.abc import Callable, Generator
|
||||
import contextlib
|
||||
from contextlib import suppress
|
||||
from datetime import timedelta
|
||||
@@ -12,7 +12,7 @@ import sys
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
from typing import Any, cast
|
||||
from typing import Any, NamedTuple, cast
|
||||
|
||||
from lru import LRU
|
||||
import voluptuous as vol
|
||||
@@ -40,6 +40,8 @@ SERVICE_LOG_THREAD_FRAMES = "log_thread_frames"
|
||||
SERVICE_LOG_EVENT_LOOP_SCHEDULED = "log_event_loop_scheduled"
|
||||
SERVICE_SET_ASYNCIO_DEBUG = "set_asyncio_debug"
|
||||
SERVICE_LOG_CURRENT_TASKS = "log_current_tasks"
|
||||
SERVICE_START_AUDITING_EVENTS = "start_auditing_events"
|
||||
SERVICE_STOP_AUDITING_EVENTS = "stop_auditing_events"
|
||||
|
||||
_LRU_CACHE_WRAPPER_OBJECT = _lru_cache_wrapper.__name__
|
||||
_SQLALCHEMY_LRU_OBJECT = "LRUCache"
|
||||
@@ -66,14 +68,34 @@ SERVICES = (
|
||||
)
|
||||
|
||||
DEFAULT_SCAN_INTERVAL = timedelta(seconds=30)
|
||||
DEFAULT_SUMMARY_INTERVAL = timedelta(seconds=30)
|
||||
|
||||
DEFAULT_MAX_OBJECTS = 5
|
||||
|
||||
CONF_ENABLED = "enabled"
|
||||
CONF_SECONDS = "seconds"
|
||||
CONF_MAX_OBJECTS = "max_objects"
|
||||
CONF_EVENTS = "events"
|
||||
CONF_FILTER = "filter"
|
||||
CONF_SUMMARY_INTERVAL = "summary_interval"
|
||||
CONF_VERBOSE = "verbose"
|
||||
|
||||
LOG_INTERVAL_SUB = "log_interval_subscription"
|
||||
AUDITING_HOOK_ADDED = "auditing_hook_added"
|
||||
AUDITED_EVENTS = "audited_events"
|
||||
AUDIT_INTERVAL_SUB = "audit_interval_subscription"
|
||||
AUDIT_SUMMARY = "audit_summary"
|
||||
|
||||
|
||||
class AuditConfig(NamedTuple):
|
||||
"""Auditing configuration of a single audit event type."""
|
||||
|
||||
verbose: bool
|
||||
filter: str | None = None
|
||||
|
||||
|
||||
type SummaryByArgs = dict[str, int]
|
||||
type SummaryByTraceback = dict[str, SummaryByArgs]
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@@ -84,7 +106,11 @@ async def async_setup_entry( # noqa: C901
|
||||
) -> bool:
|
||||
"""Set up Profiler from a config entry."""
|
||||
lock = asyncio.Lock()
|
||||
domain_data = hass.data[DOMAIN] = {}
|
||||
|
||||
domain_data = hass.data[DOMAIN] = {
|
||||
AUDITING_HOOK_ADDED: False,
|
||||
AUDITED_EVENTS: {},
|
||||
}
|
||||
|
||||
async def _async_run_profile(call: ServiceCall) -> None:
|
||||
async with lock:
|
||||
@@ -273,6 +299,54 @@ async def async_setup_entry( # noqa: C901
|
||||
base_logger.setLevel(logging.INFO)
|
||||
hass.loop.set_debug(enabled)
|
||||
|
||||
async def _async_start_auditing_events(call: ServiceCall) -> None:
|
||||
"""Configure auditing of Python events."""
|
||||
events = call.data[CONF_EVENTS]
|
||||
verbose = call.data.get(CONF_VERBOSE, False)
|
||||
filter_ = call.data.get(CONF_FILTER)
|
||||
summary_interval = call.data.get(CONF_SUMMARY_INTERVAL, None)
|
||||
audited_events: dict[str, AuditConfig] = domain_data[AUDITED_EVENTS]
|
||||
|
||||
if summary_interval and AUDIT_INTERVAL_SUB in domain_data:
|
||||
raise HomeAssistantError("Already summarizing events")
|
||||
|
||||
for event in events:
|
||||
# Always log this at critical level so we know when
|
||||
# it's been changed when reviewing logs
|
||||
_LOGGER.critical(
|
||||
"Enabling %sauditing for event %s%s",
|
||||
"verbose " if verbose else "",
|
||||
event,
|
||||
f" with filter '{filter_}'" if filter_ else "",
|
||||
)
|
||||
audited_events[event] = AuditConfig(verbose=verbose, filter=filter_)
|
||||
|
||||
if not domain_data[AUDITING_HOOK_ADDED]:
|
||||
_LOGGER.info("Adding Python audit hook function")
|
||||
sys.addaudithook(_make_audit_hook(domain_data))
|
||||
domain_data[AUDITING_HOOK_ADDED] = True
|
||||
|
||||
# Force this module's logger at least to DEBUG to see the events
|
||||
if _LOGGER.getEffectiveLevel() > logging.DEBUG:
|
||||
_LOGGER.setLevel(logging.DEBUG)
|
||||
|
||||
if summary_interval:
|
||||
domain_data[AUDIT_SUMMARY] = {}
|
||||
|
||||
async def _log_events_summary_job(*_: Any) -> None:
|
||||
await hass.async_add_executor_job(_log_events_summary, domain_data)
|
||||
|
||||
domain_data[AUDIT_INTERVAL_SUB] = async_track_time_interval(
|
||||
hass, _log_events_summary_job, summary_interval
|
||||
)
|
||||
|
||||
async def _async_stop_auditing_events(call: ServiceCall) -> None:
|
||||
"""Stop auditing all Python events."""
|
||||
domain_data[AUDITED_EVENTS].clear()
|
||||
if AUDIT_INTERVAL_SUB in domain_data:
|
||||
domain_data.pop(AUDIT_INTERVAL_SUB)()
|
||||
domain_data.pop(AUDIT_SUMMARY)
|
||||
|
||||
async_register_admin_service(
|
||||
hass,
|
||||
DOMAIN,
|
||||
@@ -382,6 +456,28 @@ async def async_setup_entry( # noqa: C901
|
||||
_async_dump_current_tasks,
|
||||
)
|
||||
|
||||
async_register_admin_service(
|
||||
hass,
|
||||
DOMAIN,
|
||||
SERVICE_START_AUDITING_EVENTS,
|
||||
_async_start_auditing_events,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_EVENTS): vol.All(cv.ensure_list, [cv.string]),
|
||||
vol.Optional(CONF_VERBOSE, default=False): cv.boolean,
|
||||
vol.Optional(CONF_FILTER): cv.string,
|
||||
vol.Optional(CONF_SUMMARY_INTERVAL): cv.time_period,
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
async_register_admin_service(
|
||||
hass,
|
||||
DOMAIN,
|
||||
SERVICE_STOP_AUDITING_EVENTS,
|
||||
_async_stop_auditing_events,
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@@ -391,6 +487,13 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
hass.services.async_remove(domain=DOMAIN, service=service)
|
||||
if LOG_INTERVAL_SUB in hass.data[DOMAIN]:
|
||||
hass.data[DOMAIN][LOG_INTERVAL_SUB]()
|
||||
if hass.data[DOMAIN].get(AUDITING_HOOK_ADDED, False):
|
||||
_LOGGER.warning(
|
||||
"Python auditing hook cannot be removed, only suppressing logging"
|
||||
)
|
||||
hass.data[DOMAIN][AUDITED_EVENTS].clear()
|
||||
if AUDIT_INTERVAL_SUB in hass.data[DOMAIN]:
|
||||
hass.data[DOMAIN].pop(AUDIT_INTERVAL_SUB)()
|
||||
hass.data.pop(DOMAIN)
|
||||
return True
|
||||
|
||||
@@ -598,3 +701,66 @@ def _increase_repr_limit() -> Generator[None]:
|
||||
finally:
|
||||
arepr.maxstring = original_maxstring
|
||||
arepr.maxother = original_maxother
|
||||
|
||||
|
||||
def _make_audit_hook(domain_data: dict[str, Any]) -> Callable:
|
||||
"""Create an audit hook function that logs events in audited_events."""
|
||||
|
||||
def _audit_hook(event, args):
|
||||
audited_events: dict[str, AuditConfig] = domain_data.get(AUDITED_EVENTS, {})
|
||||
summary: SummaryByTraceback | None = domain_data.get(AUDIT_SUMMARY)
|
||||
|
||||
if event in audited_events:
|
||||
filter_ = audited_events[event].filter
|
||||
args_repr = repr(args)
|
||||
if filter_ and filter_ not in args_repr:
|
||||
return
|
||||
|
||||
summarized = summary is not None
|
||||
if summarized or audited_events[event].verbose:
|
||||
stack = reversed(
|
||||
traceback.format_stack()[:-1]
|
||||
) # exclude this function call
|
||||
tb = "".join(stack).strip()
|
||||
|
||||
if summarized:
|
||||
event_info = f"{event} {args_repr}"
|
||||
event_traceback_summary: SummaryByArgs = summary.setdefault(tb, {})
|
||||
event_traceback_summary.setdefault(event_info, 0)
|
||||
event_traceback_summary[event_info] += 1
|
||||
return
|
||||
|
||||
if audited_events[event].verbose:
|
||||
_LOGGER.debug(
|
||||
"Audited event: %s %s, traceback (most recent call first):\n %s",
|
||||
event,
|
||||
args_repr,
|
||||
tb,
|
||||
)
|
||||
else:
|
||||
_LOGGER.debug("Audited event: %s %s", event, args_repr)
|
||||
|
||||
return _audit_hook
|
||||
|
||||
|
||||
def _log_events_summary(domain_data: dict[str, Any]) -> None:
|
||||
"""Log a summary of audited events."""
|
||||
summary: SummaryByTraceback | None = domain_data.get(AUDIT_SUMMARY)
|
||||
|
||||
if not summary:
|
||||
_LOGGER.debug("No audited events in the last period")
|
||||
return
|
||||
|
||||
old_summary, domain_data[AUDIT_SUMMARY] = summary, {}
|
||||
|
||||
for tb, events in summary.items():
|
||||
_LOGGER.debug(
|
||||
"Audited events summary for traceback (most recent call first):"
|
||||
"\n %s"
|
||||
"\nEvent arguments:"
|
||||
"\n%s",
|
||||
tb,
|
||||
"\n".join(f" - {count}×: {event}" for event, count in events.items()),
|
||||
)
|
||||
|
||||
old_summary.clear()
|
||||
|
||||
@@ -35,6 +35,12 @@
|
||||
},
|
||||
"set_asyncio_debug": {
|
||||
"service": "mdi:bug-check"
|
||||
},
|
||||
"start_auditing_events": {
|
||||
"service": "mdi:clipboard-alert-outline"
|
||||
},
|
||||
"stop_auditing_events": {
|
||||
"service": "mdi:clipboard-alert"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,3 +60,25 @@ set_asyncio_debug:
|
||||
selector:
|
||||
boolean:
|
||||
log_current_tasks:
|
||||
start_auditing_events:
|
||||
fields:
|
||||
events:
|
||||
required: true
|
||||
example: socket.connect
|
||||
selector:
|
||||
text:
|
||||
multiple: true
|
||||
verbose:
|
||||
default: false
|
||||
selector:
|
||||
boolean:
|
||||
filter:
|
||||
selector:
|
||||
text:
|
||||
summary_interval:
|
||||
selector:
|
||||
number:
|
||||
min: 1
|
||||
max: 3600
|
||||
unit_of_measurement: seconds
|
||||
stop_auditing_events:
|
||||
|
||||
@@ -94,6 +94,32 @@
|
||||
"log_current_tasks": {
|
||||
"name": "Log current asyncio tasks",
|
||||
"description": "Logs all the current asyncio tasks."
|
||||
},
|
||||
"start_auditing_events": {
|
||||
"name": "Add audited events",
|
||||
"description": "Add list of Python events to audit.",
|
||||
"fields": {
|
||||
"events": {
|
||||
"name": "Events",
|
||||
"description": "List of Python events to audit (see https://docs.python.org/3/library/audit_events.html#audit-events for the list of auditable events)."
|
||||
},
|
||||
"verbose": {
|
||||
"name": "Verbose",
|
||||
"description": "Include full stack trace of the audited event."
|
||||
},
|
||||
"filter": {
|
||||
"name": "Filter",
|
||||
"description": "Text that must be contained in the string representation of event's arguments (e.g. an IP address)."
|
||||
},
|
||||
"summary_interval": {
|
||||
"name": "Summary interval",
|
||||
"description": "If set, print aggregated list of events (per traceback and arguments) that occurred between the specified time interval."
|
||||
}
|
||||
}
|
||||
},
|
||||
"stop_auditing_events": {
|
||||
"name": "Stop auditing events",
|
||||
"description": "Stop Python events auditing."
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,8 +15,14 @@ import pytest
|
||||
from homeassistant.components.profiler import (
|
||||
_LRU_CACHE_WRAPPER_OBJECT,
|
||||
_SQLALCHEMY_LRU_OBJECT,
|
||||
AUDITED_EVENTS,
|
||||
AUDITING_HOOK_ADDED,
|
||||
CONF_ENABLED,
|
||||
CONF_EVENTS,
|
||||
CONF_FILTER,
|
||||
CONF_SECONDS,
|
||||
CONF_SUMMARY_INTERVAL,
|
||||
CONF_VERBOSE,
|
||||
SERVICE_DUMP_LOG_OBJECTS,
|
||||
SERVICE_LOG_CURRENT_TASKS,
|
||||
SERVICE_LOG_EVENT_LOOP_SCHEDULED,
|
||||
@@ -25,8 +31,10 @@ from homeassistant.components.profiler import (
|
||||
SERVICE_MEMORY,
|
||||
SERVICE_SET_ASYNCIO_DEBUG,
|
||||
SERVICE_START,
|
||||
SERVICE_START_AUDITING_EVENTS,
|
||||
SERVICE_START_LOG_OBJECT_SOURCES,
|
||||
SERVICE_START_LOG_OBJECTS,
|
||||
SERVICE_STOP_AUDITING_EVENTS,
|
||||
SERVICE_STOP_LOG_OBJECT_SOURCES,
|
||||
SERVICE_STOP_LOG_OBJECTS,
|
||||
)
|
||||
@@ -39,6 +47,13 @@ from homeassistant.util import dt as dt_util
|
||||
from tests.common import MockConfigEntry, async_fire_time_changed
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sys_addaudithook():
|
||||
"""Mock sys.addaudithook."""
|
||||
with patch("sys.addaudithook") as mock_addaudithook:
|
||||
yield mock_addaudithook
|
||||
|
||||
|
||||
async def test_basic_usage(hass: HomeAssistant, tmp_path: Path) -> None:
|
||||
"""Test we can setup and the service is registered."""
|
||||
test_dir = tmp_path / "profiles"
|
||||
@@ -447,3 +462,226 @@ async def test_set_asyncio_debug(
|
||||
|
||||
assert await hass.config_entries.async_unload(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
||||
async def test_auditing_events_mocked(
|
||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, sys_addaudithook
|
||||
) -> None:
|
||||
"""Test service calls for events auditing with mocked sys.addaudithook."""
|
||||
|
||||
entry = MockConfigEntry(domain=DOMAIN)
|
||||
entry.add_to_hass(hass)
|
||||
|
||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert hass.services.has_service(DOMAIN, SERVICE_START_AUDITING_EVENTS)
|
||||
|
||||
assert hass.data[DOMAIN][AUDITING_HOOK_ADDED] is False
|
||||
|
||||
# Capture 'open' in non-verbose mode
|
||||
await hass.services.async_call(
|
||||
DOMAIN,
|
||||
SERVICE_START_AUDITING_EVENTS,
|
||||
{CONF_EVENTS: ["open"], CONF_VERBOSE: False},
|
||||
blocking=True,
|
||||
)
|
||||
assert hass.data[DOMAIN][AUDITING_HOOK_ADDED] is True
|
||||
event_open = hass.data[DOMAIN][AUDITED_EVENTS].get("open")
|
||||
assert event_open.verbose is False
|
||||
assert event_open.filter is None
|
||||
assert "Enabling auditing for event open" in caplog.text
|
||||
assert sys_addaudithook.call_count == 1
|
||||
|
||||
# Add 'import' in verbose mode
|
||||
await hass.services.async_call(
|
||||
DOMAIN,
|
||||
SERVICE_START_AUDITING_EVENTS,
|
||||
{CONF_EVENTS: ["import"], CONF_VERBOSE: True},
|
||||
blocking=True,
|
||||
)
|
||||
event_import = hass.data[DOMAIN][AUDITED_EVENTS].get("import")
|
||||
assert event_import.verbose is True
|
||||
assert event_import.filter is None
|
||||
event_open = hass.data[DOMAIN][AUDITED_EVENTS].get("open")
|
||||
assert event_open.verbose is False
|
||||
assert event_open.filter is None
|
||||
assert "Enabling verbose auditing for event import" in caplog.text
|
||||
assert sys_addaudithook.call_count == 1
|
||||
|
||||
caplog.clear()
|
||||
|
||||
# Add 'exec' in verbose mode and set 'open' to verbose
|
||||
await hass.services.async_call(
|
||||
DOMAIN,
|
||||
SERVICE_START_AUDITING_EVENTS,
|
||||
{CONF_EVENTS: ["exec", "open"], CONF_VERBOSE: True},
|
||||
blocking=True,
|
||||
)
|
||||
event_exec = hass.data[DOMAIN][AUDITED_EVENTS].get("exec")
|
||||
assert event_exec.verbose is True
|
||||
assert event_exec.filter is None
|
||||
event_import = hass.data[DOMAIN][AUDITED_EVENTS].get("import")
|
||||
assert event_import.verbose is True
|
||||
assert event_import.filter is None
|
||||
event_open = hass.data[DOMAIN][AUDITED_EVENTS].get("open")
|
||||
assert event_open.verbose is True
|
||||
assert event_open.filter is None
|
||||
assert sys_addaudithook.call_count == 1
|
||||
assert "Enabling verbose auditing for event exec" in caplog.text
|
||||
assert "Enabling verbose auditing for event open" in caplog.text
|
||||
|
||||
caplog.clear()
|
||||
|
||||
# Add filtering for 'exec' in verbose mode
|
||||
await hass.services.async_call(
|
||||
DOMAIN,
|
||||
SERVICE_START_AUDITING_EVENTS,
|
||||
{CONF_EVENTS: ["exec"], CONF_VERBOSE: True, CONF_FILTER: "test"},
|
||||
blocking=True,
|
||||
)
|
||||
event_exec = hass.data[DOMAIN][AUDITED_EVENTS].get("exec")
|
||||
assert event_exec.filter == "test"
|
||||
assert hass.data[DOMAIN][AUDITED_EVENTS].get("import").filter is None
|
||||
assert hass.data[DOMAIN][AUDITED_EVENTS].get("open").filter is None
|
||||
assert sys_addaudithook.call_count == 1
|
||||
assert "Enabling verbose auditing for event exec with filter 'test'" in caplog.text
|
||||
|
||||
caplog.clear()
|
||||
|
||||
# Remove 'exec' from audited events
|
||||
await hass.services.async_call(DOMAIN, SERVICE_STOP_AUDITING_EVENTS, blocking=True)
|
||||
assert "exec" not in hass.data[DOMAIN][AUDITED_EVENTS]
|
||||
assert len(hass.data[DOMAIN][AUDITED_EVENTS]) == 0
|
||||
|
||||
caplog.clear()
|
||||
assert await hass.config_entries.async_unload(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
assert "Python auditing hook cannot be removed" in caplog.text
|
||||
|
||||
|
||||
async def test_auditing_events(
|
||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
|
||||
) -> None:
|
||||
"""Test actual Python events auditing."""
|
||||
entry = MockConfigEntry(domain=DOMAIN)
|
||||
entry.add_to_hass(hass)
|
||||
|
||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert hass.services.has_service(DOMAIN, SERVICE_START_AUDITING_EVENTS)
|
||||
|
||||
assert hass.data[DOMAIN][AUDITING_HOOK_ADDED] is False
|
||||
|
||||
# Capture 'exec' in non-verbose mode
|
||||
await hass.services.async_call(
|
||||
DOMAIN,
|
||||
SERVICE_START_AUDITING_EVENTS,
|
||||
{CONF_EVENTS: ["exec"], CONF_VERBOSE: False},
|
||||
blocking=True,
|
||||
)
|
||||
assert hass.data[DOMAIN][AUDITING_HOOK_ADDED] is True
|
||||
assert hass.data[DOMAIN][AUDITED_EVENTS].get("exec").verbose is False
|
||||
assert "Enabling auditing for event exec" in caplog.text
|
||||
caplog.clear()
|
||||
exec("1+1") # noqa: S102
|
||||
assert "Audited event: exec" in caplog.text
|
||||
assert "traceback" not in caplog.text
|
||||
|
||||
# Nothing should be logged after stopping
|
||||
await hass.services.async_call(DOMAIN, SERVICE_STOP_AUDITING_EVENTS, blocking=True)
|
||||
caplog.clear()
|
||||
exec("1+1") # noqa: S102
|
||||
assert "Audited event" not in caplog.text
|
||||
|
||||
# Test verbose capture
|
||||
await hass.services.async_call(
|
||||
DOMAIN,
|
||||
SERVICE_START_AUDITING_EVENTS,
|
||||
{CONF_EVENTS: ["exec"], CONF_VERBOSE: True},
|
||||
blocking=True,
|
||||
)
|
||||
caplog.clear()
|
||||
exec("1+1") # noqa: S102
|
||||
assert "Audited event: exec" in caplog.text
|
||||
assert "traceback" in caplog.text
|
||||
assert 'exec("1+1")' in caplog.text
|
||||
|
||||
# Test filtered capture (matched filter)
|
||||
# the argument of exec is: <code object <module> at 0x7f2d5e9c9140, file "<string>", line 1>
|
||||
await hass.services.async_call(
|
||||
DOMAIN,
|
||||
SERVICE_START_AUDITING_EVENTS,
|
||||
{
|
||||
CONF_EVENTS: ["exec"],
|
||||
CONF_VERBOSE: True,
|
||||
CONF_FILTER: 'file "<string>", line 1',
|
||||
},
|
||||
blocking=True,
|
||||
)
|
||||
caplog.clear()
|
||||
exec("1+1") # noqa: S102
|
||||
assert "Audited event: exec" in caplog.text
|
||||
assert "traceback" in caplog.text
|
||||
assert 'exec("1+1")' in caplog.text
|
||||
|
||||
# Test filtered capture (unmatched filter)
|
||||
await hass.services.async_call(
|
||||
DOMAIN,
|
||||
SERVICE_START_AUDITING_EVENTS,
|
||||
{CONF_EVENTS: ["exec"], CONF_VERBOSE: True, CONF_FILTER: "random filter"},
|
||||
blocking=True,
|
||||
)
|
||||
caplog.clear()
|
||||
exec("1+1") # noqa: S102
|
||||
assert "Audited event: exec" not in caplog.text
|
||||
|
||||
assert await hass.config_entries.async_unload(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
assert "Python auditing hook cannot be removed" in caplog.text
|
||||
|
||||
# Nothing should be captured after unloading the integration
|
||||
caplog.clear()
|
||||
exec("1+1") # noqa: S102
|
||||
assert "Audited event" not in caplog.text
|
||||
|
||||
|
||||
async def test_auditing_events_summarized(
|
||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
|
||||
) -> None:
|
||||
"""Test actual Python events auditing with summarization enabled."""
|
||||
entry = MockConfigEntry(domain=DOMAIN)
|
||||
entry.add_to_hass(hass)
|
||||
|
||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert hass.services.has_service(DOMAIN, SERVICE_START_AUDITING_EVENTS)
|
||||
|
||||
assert hass.data[DOMAIN][AUDITING_HOOK_ADDED] is False
|
||||
|
||||
# Capture 'exec' summarized every second
|
||||
await hass.services.async_call(
|
||||
DOMAIN,
|
||||
SERVICE_START_AUDITING_EVENTS,
|
||||
{CONF_EVENTS: ["exec"], CONF_SUMMARY_INTERVAL: timedelta(seconds=1)},
|
||||
blocking=True,
|
||||
)
|
||||
assert hass.data[DOMAIN][AUDITING_HOOK_ADDED] is True
|
||||
assert "Enabling auditing for event exec" in caplog.text
|
||||
|
||||
caplog.clear()
|
||||
exec("1+1") # noqa: S102
|
||||
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=2))
|
||||
await hass.async_block_till_done(wait_background_tasks=True)
|
||||
assert "Audited events summary" in caplog.text
|
||||
assert "1×: exec" in caplog.text
|
||||
|
||||
# Nothing should be logged after stopping
|
||||
await hass.services.async_call(DOMAIN, SERVICE_STOP_AUDITING_EVENTS, blocking=True)
|
||||
caplog.clear()
|
||||
exec("1+1") # noqa: S102
|
||||
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=21))
|
||||
await hass.async_block_till_done(wait_background_tasks=True)
|
||||
assert "Audited event" not in caplog.text
|
||||
|
||||
Reference in New Issue
Block a user