Compare commits

...

5 Commits

Author SHA1 Message Date
Jan Čermák
631042237b Correctly check for auditing hook added when unloading 2025-09-16 18:31:16 +02:00
Jan Čermák
e70a3042c2 Implement summarized logging, simplify stop
* Implement interval-based logging, summarizing events by traceback
* Simplify stop service call to disable capturing the logs
2025-09-16 17:40:22 +02:00
Jan Čermák
c4eef15be7 Indent first line of formatted traceback 2025-09-16 13:30:06 +02:00
Jan Čermák
aaab467b29 Preserve args repr in variable 2025-09-16 13:30:05 +02:00
Jan Čermák
2d9b6e9c14 Add service for logging Python audit events to profiler integration
Python has "auditing" API which provides a way to track certain actions
which are executed in runtime. This can be useful for instance when user
wants to track what integrations are making outbound network connection
or accessing files on the filesystem, which may be needed also to
identify sources of resource leaks.

This PR adds services for configuring what audit events should be
logged, with optional filtering and full traceback capture.

The Audit API was defined by PEP 578 and is available in the standard
library since Python 3.8: https://peps.python.org/pep-0578

List of events which can be audited:
https://docs.python.org/3/library/audit_events.html
2025-09-16 13:30:04 +02:00
5 changed files with 461 additions and 3 deletions

View File

@@ -1,7 +1,7 @@
"""The profiler integration."""
import asyncio
from collections.abc import Generator
from collections.abc import Callable, Generator
import contextlib
from contextlib import suppress
from datetime import timedelta
@@ -12,7 +12,7 @@ import sys
import threading
import time
import traceback
from typing import Any, cast
from typing import Any, NamedTuple, cast
from lru import LRU
import voluptuous as vol
@@ -40,6 +40,8 @@ SERVICE_LOG_THREAD_FRAMES = "log_thread_frames"
SERVICE_LOG_EVENT_LOOP_SCHEDULED = "log_event_loop_scheduled"
SERVICE_SET_ASYNCIO_DEBUG = "set_asyncio_debug"
SERVICE_LOG_CURRENT_TASKS = "log_current_tasks"
SERVICE_START_AUDITING_EVENTS = "start_auditing_events"
SERVICE_STOP_AUDITING_EVENTS = "stop_auditing_events"
_LRU_CACHE_WRAPPER_OBJECT = _lru_cache_wrapper.__name__
_SQLALCHEMY_LRU_OBJECT = "LRUCache"
@@ -66,14 +68,34 @@ SERVICES = (
)
DEFAULT_SCAN_INTERVAL = timedelta(seconds=30)
DEFAULT_SUMMARY_INTERVAL = timedelta(seconds=30)
DEFAULT_MAX_OBJECTS = 5
CONF_ENABLED = "enabled"
CONF_SECONDS = "seconds"
CONF_MAX_OBJECTS = "max_objects"
CONF_EVENTS = "events"
CONF_FILTER = "filter"
CONF_SUMMARY_INTERVAL = "summary_interval"
CONF_VERBOSE = "verbose"
LOG_INTERVAL_SUB = "log_interval_subscription"
AUDITING_HOOK_ADDED = "auditing_hook_added"
AUDITED_EVENTS = "audited_events"
AUDIT_INTERVAL_SUB = "audit_interval_subscription"
AUDIT_SUMMARY = "audit_summary"
class AuditConfig(NamedTuple):
"""Auditing configuration of a single audit event type."""
verbose: bool
filter: str | None = None
type SummaryByArgs = dict[str, int]
type SummaryByTraceback = dict[str, SummaryByArgs]
_LOGGER = logging.getLogger(__name__)
@@ -84,7 +106,11 @@ async def async_setup_entry( # noqa: C901
) -> bool:
"""Set up Profiler from a config entry."""
lock = asyncio.Lock()
domain_data = hass.data[DOMAIN] = {}
domain_data = hass.data[DOMAIN] = {
AUDITING_HOOK_ADDED: False,
AUDITED_EVENTS: {},
}
async def _async_run_profile(call: ServiceCall) -> None:
async with lock:
@@ -273,6 +299,54 @@ async def async_setup_entry( # noqa: C901
base_logger.setLevel(logging.INFO)
hass.loop.set_debug(enabled)
async def _async_start_auditing_events(call: ServiceCall) -> None:
"""Configure auditing of Python events."""
events = call.data[CONF_EVENTS]
verbose = call.data.get(CONF_VERBOSE, False)
filter_ = call.data.get(CONF_FILTER)
summary_interval = call.data.get(CONF_SUMMARY_INTERVAL, None)
audited_events: dict[str, AuditConfig] = domain_data[AUDITED_EVENTS]
if summary_interval and AUDIT_INTERVAL_SUB in domain_data:
raise HomeAssistantError("Already summarizing events")
for event in events:
# Always log this at critical level so we know when
# it's been changed when reviewing logs
_LOGGER.critical(
"Enabling %sauditing for event %s%s",
"verbose " if verbose else "",
event,
f" with filter '{filter_}'" if filter_ else "",
)
audited_events[event] = AuditConfig(verbose=verbose, filter=filter_)
if not domain_data[AUDITING_HOOK_ADDED]:
_LOGGER.info("Adding Python audit hook function")
sys.addaudithook(_make_audit_hook(domain_data))
domain_data[AUDITING_HOOK_ADDED] = True
# Force this module's logger at least to DEBUG to see the events
if _LOGGER.getEffectiveLevel() > logging.DEBUG:
_LOGGER.setLevel(logging.DEBUG)
if summary_interval:
domain_data[AUDIT_SUMMARY] = {}
async def _log_events_summary_job(*_: Any) -> None:
await hass.async_add_executor_job(_log_events_summary, domain_data)
domain_data[AUDIT_INTERVAL_SUB] = async_track_time_interval(
hass, _log_events_summary_job, summary_interval
)
async def _async_stop_auditing_events(call: ServiceCall) -> None:
"""Stop auditing all Python events."""
domain_data[AUDITED_EVENTS].clear()
if AUDIT_INTERVAL_SUB in domain_data:
domain_data.pop(AUDIT_INTERVAL_SUB)()
domain_data.pop(AUDIT_SUMMARY)
async_register_admin_service(
hass,
DOMAIN,
@@ -382,6 +456,28 @@ async def async_setup_entry( # noqa: C901
_async_dump_current_tasks,
)
async_register_admin_service(
hass,
DOMAIN,
SERVICE_START_AUDITING_EVENTS,
_async_start_auditing_events,
schema=vol.Schema(
{
vol.Required(CONF_EVENTS): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_VERBOSE, default=False): cv.boolean,
vol.Optional(CONF_FILTER): cv.string,
vol.Optional(CONF_SUMMARY_INTERVAL): cv.time_period,
}
),
)
async_register_admin_service(
hass,
DOMAIN,
SERVICE_STOP_AUDITING_EVENTS,
_async_stop_auditing_events,
)
return True
@@ -391,6 +487,13 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass.services.async_remove(domain=DOMAIN, service=service)
if LOG_INTERVAL_SUB in hass.data[DOMAIN]:
hass.data[DOMAIN][LOG_INTERVAL_SUB]()
if hass.data[DOMAIN].get(AUDITING_HOOK_ADDED, False):
_LOGGER.warning(
"Python auditing hook cannot be removed, only suppressing logging"
)
hass.data[DOMAIN][AUDITED_EVENTS].clear()
if AUDIT_INTERVAL_SUB in hass.data[DOMAIN]:
hass.data[DOMAIN].pop(AUDIT_INTERVAL_SUB)()
hass.data.pop(DOMAIN)
return True
@@ -598,3 +701,66 @@ def _increase_repr_limit() -> Generator[None]:
finally:
arepr.maxstring = original_maxstring
arepr.maxother = original_maxother
def _make_audit_hook(domain_data: dict[str, Any]) -> Callable:
"""Create an audit hook function that logs events in audited_events."""
def _audit_hook(event, args):
audited_events: dict[str, AuditConfig] = domain_data.get(AUDITED_EVENTS, {})
summary: SummaryByTraceback | None = domain_data.get(AUDIT_SUMMARY)
if event in audited_events:
filter_ = audited_events[event].filter
args_repr = repr(args)
if filter_ and filter_ not in args_repr:
return
summarized = summary is not None
if summarized or audited_events[event].verbose:
stack = reversed(
traceback.format_stack()[:-1]
) # exclude this function call
tb = "".join(stack).strip()
if summarized:
event_info = f"{event} {args_repr}"
event_traceback_summary: SummaryByArgs = summary.setdefault(tb, {})
event_traceback_summary.setdefault(event_info, 0)
event_traceback_summary[event_info] += 1
return
if audited_events[event].verbose:
_LOGGER.debug(
"Audited event: %s %s, traceback (most recent call first):\n %s",
event,
args_repr,
tb,
)
else:
_LOGGER.debug("Audited event: %s %s", event, args_repr)
return _audit_hook
def _log_events_summary(domain_data: dict[str, Any]) -> None:
"""Log a summary of audited events."""
summary: SummaryByTraceback | None = domain_data.get(AUDIT_SUMMARY)
if not summary:
_LOGGER.debug("No audited events in the last period")
return
old_summary, domain_data[AUDIT_SUMMARY] = summary, {}
for tb, events in summary.items():
_LOGGER.debug(
"Audited events summary for traceback (most recent call first):"
"\n %s"
"\nEvent arguments:"
"\n%s",
tb,
"\n".join(f" - {count}×: {event}" for event, count in events.items()),
)
old_summary.clear()

View File

@@ -35,6 +35,12 @@
},
"set_asyncio_debug": {
"service": "mdi:bug-check"
},
"start_auditing_events": {
"service": "mdi:clipboard-alert-outline"
},
"stop_auditing_events": {
"service": "mdi:clipboard-alert"
}
}
}

View File

@@ -60,3 +60,25 @@ set_asyncio_debug:
selector:
boolean:
log_current_tasks:
start_auditing_events:
fields:
events:
required: true
example: socket.connect
selector:
text:
multiple: true
verbose:
default: false
selector:
boolean:
filter:
selector:
text:
summary_interval:
selector:
number:
min: 1
max: 3600
unit_of_measurement: seconds
stop_auditing_events:

View File

@@ -94,6 +94,32 @@
"log_current_tasks": {
"name": "Log current asyncio tasks",
"description": "Logs all the current asyncio tasks."
},
"start_auditing_events": {
"name": "Add audited events",
"description": "Add list of Python events to audit.",
"fields": {
"events": {
"name": "Events",
"description": "List of Python events to audit (see https://docs.python.org/3/library/audit_events.html#audit-events for the list of auditable events)."
},
"verbose": {
"name": "Verbose",
"description": "Include full stack trace of the audited event."
},
"filter": {
"name": "Filter",
"description": "Text that must be contained in the string representation of event's arguments (e.g. an IP address)."
},
"summary_interval": {
"name": "Summary interval",
"description": "If set, print aggregated list of events (per traceback and arguments) that occurred between the specified time interval."
}
}
},
"stop_auditing_events": {
"name": "Stop auditing events",
"description": "Stop Python events auditing."
}
}
}

View File

@@ -15,8 +15,14 @@ import pytest
from homeassistant.components.profiler import (
_LRU_CACHE_WRAPPER_OBJECT,
_SQLALCHEMY_LRU_OBJECT,
AUDITED_EVENTS,
AUDITING_HOOK_ADDED,
CONF_ENABLED,
CONF_EVENTS,
CONF_FILTER,
CONF_SECONDS,
CONF_SUMMARY_INTERVAL,
CONF_VERBOSE,
SERVICE_DUMP_LOG_OBJECTS,
SERVICE_LOG_CURRENT_TASKS,
SERVICE_LOG_EVENT_LOOP_SCHEDULED,
@@ -25,8 +31,10 @@ from homeassistant.components.profiler import (
SERVICE_MEMORY,
SERVICE_SET_ASYNCIO_DEBUG,
SERVICE_START,
SERVICE_START_AUDITING_EVENTS,
SERVICE_START_LOG_OBJECT_SOURCES,
SERVICE_START_LOG_OBJECTS,
SERVICE_STOP_AUDITING_EVENTS,
SERVICE_STOP_LOG_OBJECT_SOURCES,
SERVICE_STOP_LOG_OBJECTS,
)
@@ -39,6 +47,13 @@ from homeassistant.util import dt as dt_util
from tests.common import MockConfigEntry, async_fire_time_changed
@pytest.fixture
def sys_addaudithook():
"""Mock sys.addaudithook."""
with patch("sys.addaudithook") as mock_addaudithook:
yield mock_addaudithook
async def test_basic_usage(hass: HomeAssistant, tmp_path: Path) -> None:
"""Test we can setup and the service is registered."""
test_dir = tmp_path / "profiles"
@@ -447,3 +462,226 @@ async def test_set_asyncio_debug(
assert await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
async def test_auditing_events_mocked(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, sys_addaudithook
) -> None:
"""Test service calls for events auditing with mocked sys.addaudithook."""
entry = MockConfigEntry(domain=DOMAIN)
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert hass.services.has_service(DOMAIN, SERVICE_START_AUDITING_EVENTS)
assert hass.data[DOMAIN][AUDITING_HOOK_ADDED] is False
# Capture 'open' in non-verbose mode
await hass.services.async_call(
DOMAIN,
SERVICE_START_AUDITING_EVENTS,
{CONF_EVENTS: ["open"], CONF_VERBOSE: False},
blocking=True,
)
assert hass.data[DOMAIN][AUDITING_HOOK_ADDED] is True
event_open = hass.data[DOMAIN][AUDITED_EVENTS].get("open")
assert event_open.verbose is False
assert event_open.filter is None
assert "Enabling auditing for event open" in caplog.text
assert sys_addaudithook.call_count == 1
# Add 'import' in verbose mode
await hass.services.async_call(
DOMAIN,
SERVICE_START_AUDITING_EVENTS,
{CONF_EVENTS: ["import"], CONF_VERBOSE: True},
blocking=True,
)
event_import = hass.data[DOMAIN][AUDITED_EVENTS].get("import")
assert event_import.verbose is True
assert event_import.filter is None
event_open = hass.data[DOMAIN][AUDITED_EVENTS].get("open")
assert event_open.verbose is False
assert event_open.filter is None
assert "Enabling verbose auditing for event import" in caplog.text
assert sys_addaudithook.call_count == 1
caplog.clear()
# Add 'exec' in verbose mode and set 'open' to verbose
await hass.services.async_call(
DOMAIN,
SERVICE_START_AUDITING_EVENTS,
{CONF_EVENTS: ["exec", "open"], CONF_VERBOSE: True},
blocking=True,
)
event_exec = hass.data[DOMAIN][AUDITED_EVENTS].get("exec")
assert event_exec.verbose is True
assert event_exec.filter is None
event_import = hass.data[DOMAIN][AUDITED_EVENTS].get("import")
assert event_import.verbose is True
assert event_import.filter is None
event_open = hass.data[DOMAIN][AUDITED_EVENTS].get("open")
assert event_open.verbose is True
assert event_open.filter is None
assert sys_addaudithook.call_count == 1
assert "Enabling verbose auditing for event exec" in caplog.text
assert "Enabling verbose auditing for event open" in caplog.text
caplog.clear()
# Add filtering for 'exec' in verbose mode
await hass.services.async_call(
DOMAIN,
SERVICE_START_AUDITING_EVENTS,
{CONF_EVENTS: ["exec"], CONF_VERBOSE: True, CONF_FILTER: "test"},
blocking=True,
)
event_exec = hass.data[DOMAIN][AUDITED_EVENTS].get("exec")
assert event_exec.filter == "test"
assert hass.data[DOMAIN][AUDITED_EVENTS].get("import").filter is None
assert hass.data[DOMAIN][AUDITED_EVENTS].get("open").filter is None
assert sys_addaudithook.call_count == 1
assert "Enabling verbose auditing for event exec with filter 'test'" in caplog.text
caplog.clear()
# Remove 'exec' from audited events
await hass.services.async_call(DOMAIN, SERVICE_STOP_AUDITING_EVENTS, blocking=True)
assert "exec" not in hass.data[DOMAIN][AUDITED_EVENTS]
assert len(hass.data[DOMAIN][AUDITED_EVENTS]) == 0
caplog.clear()
assert await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
assert "Python auditing hook cannot be removed" in caplog.text
async def test_auditing_events(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test actual Python events auditing."""
entry = MockConfigEntry(domain=DOMAIN)
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert hass.services.has_service(DOMAIN, SERVICE_START_AUDITING_EVENTS)
assert hass.data[DOMAIN][AUDITING_HOOK_ADDED] is False
# Capture 'exec' in non-verbose mode
await hass.services.async_call(
DOMAIN,
SERVICE_START_AUDITING_EVENTS,
{CONF_EVENTS: ["exec"], CONF_VERBOSE: False},
blocking=True,
)
assert hass.data[DOMAIN][AUDITING_HOOK_ADDED] is True
assert hass.data[DOMAIN][AUDITED_EVENTS].get("exec").verbose is False
assert "Enabling auditing for event exec" in caplog.text
caplog.clear()
exec("1+1") # noqa: S102
assert "Audited event: exec" in caplog.text
assert "traceback" not in caplog.text
# Nothing should be logged after stopping
await hass.services.async_call(DOMAIN, SERVICE_STOP_AUDITING_EVENTS, blocking=True)
caplog.clear()
exec("1+1") # noqa: S102
assert "Audited event" not in caplog.text
# Test verbose capture
await hass.services.async_call(
DOMAIN,
SERVICE_START_AUDITING_EVENTS,
{CONF_EVENTS: ["exec"], CONF_VERBOSE: True},
blocking=True,
)
caplog.clear()
exec("1+1") # noqa: S102
assert "Audited event: exec" in caplog.text
assert "traceback" in caplog.text
assert 'exec("1+1")' in caplog.text
# Test filtered capture (matched filter)
# the argument of exec is: <code object <module> at 0x7f2d5e9c9140, file "<string>", line 1>
await hass.services.async_call(
DOMAIN,
SERVICE_START_AUDITING_EVENTS,
{
CONF_EVENTS: ["exec"],
CONF_VERBOSE: True,
CONF_FILTER: 'file "<string>", line 1',
},
blocking=True,
)
caplog.clear()
exec("1+1") # noqa: S102
assert "Audited event: exec" in caplog.text
assert "traceback" in caplog.text
assert 'exec("1+1")' in caplog.text
# Test filtered capture (unmatched filter)
await hass.services.async_call(
DOMAIN,
SERVICE_START_AUDITING_EVENTS,
{CONF_EVENTS: ["exec"], CONF_VERBOSE: True, CONF_FILTER: "random filter"},
blocking=True,
)
caplog.clear()
exec("1+1") # noqa: S102
assert "Audited event: exec" not in caplog.text
assert await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
assert "Python auditing hook cannot be removed" in caplog.text
# Nothing should be captured after unloading the integration
caplog.clear()
exec("1+1") # noqa: S102
assert "Audited event" not in caplog.text
async def test_auditing_events_summarized(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test actual Python events auditing with summarization enabled."""
entry = MockConfigEntry(domain=DOMAIN)
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert hass.services.has_service(DOMAIN, SERVICE_START_AUDITING_EVENTS)
assert hass.data[DOMAIN][AUDITING_HOOK_ADDED] is False
# Capture 'exec' summarized every second
await hass.services.async_call(
DOMAIN,
SERVICE_START_AUDITING_EVENTS,
{CONF_EVENTS: ["exec"], CONF_SUMMARY_INTERVAL: timedelta(seconds=1)},
blocking=True,
)
assert hass.data[DOMAIN][AUDITING_HOOK_ADDED] is True
assert "Enabling auditing for event exec" in caplog.text
caplog.clear()
exec("1+1") # noqa: S102
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=2))
await hass.async_block_till_done(wait_background_tasks=True)
assert "Audited events summary" in caplog.text
assert "1×: exec" in caplog.text
# Nothing should be logged after stopping
await hass.services.async_call(DOMAIN, SERVICE_STOP_AUDITING_EVENTS, blocking=True)
caplog.clear()
exec("1+1") # noqa: S102
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=21))
await hass.async_block_till_done(wait_background_tasks=True)
assert "Audited event" not in caplog.text