From 29bda196b5e0a90a2bea7e1797742236114afc1c Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 2 May 2022 23:53:56 -0500 Subject: [PATCH] Break apart recorder into tasks and core modules (#71222) --- .strict-typing | 2 + homeassistant/components/recorder/__init__.py | 1304 +---------------- homeassistant/components/recorder/const.py | 9 + homeassistant/components/recorder/core.py | 1082 ++++++++++++++ homeassistant/components/recorder/tasks.py | 250 ++++ mypy.ini | 22 + tests/components/history/test_init.py | 25 +- tests/components/recorder/test_history.py | 83 +- tests/components/recorder/test_init.py | 50 +- tests/components/recorder/test_migrate.py | 34 +- tests/components/recorder/test_purge.py | 33 +- tests/components/recorder/test_statistics.py | 12 +- tests/components/recorder/test_util.py | 6 +- .../components/recorder/test_websocket_api.py | 7 +- tests/components/sensor/test_recorder.py | 68 +- 15 files changed, 1583 insertions(+), 1404 deletions(-) create mode 100644 homeassistant/components/recorder/core.py create mode 100644 homeassistant/components/recorder/tasks.py diff --git a/.strict-typing b/.strict-typing index df54a082119..2915f398953 100644 --- a/.strict-typing +++ b/.strict-typing @@ -179,6 +179,7 @@ homeassistant.components.rdw.* homeassistant.components.recollect_waste.* homeassistant.components.recorder homeassistant.components.recorder.const +homeassistant.components.recorder.core homeassistant.components.recorder.backup homeassistant.components.recorder.executor homeassistant.components.recorder.history @@ -189,6 +190,7 @@ homeassistant.components.recorder.repack homeassistant.components.recorder.run_history homeassistant.components.recorder.statistics homeassistant.components.recorder.system_health +homeassistant.components.recorder.tasks homeassistant.components.recorder.util homeassistant.components.recorder.websocket_api homeassistant.components.remote.* diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 1d81495c3a5..f9d462abeea 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -1,55 +1,18 @@ """Support for recording details.""" from __future__ import annotations -import abc -import asyncio -from collections.abc import Callable, Iterable -from dataclasses import dataclass -from datetime import datetime, timedelta import logging -import queue -import sqlite3 -import threading -import time -from typing import Any, TypeVar, cast +from typing import Any -from lru import LRU # pylint: disable=no-name-in-module -from sqlalchemy import create_engine, event as sqlalchemy_event, exc, func, select -from sqlalchemy.engine import Engine -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.orm import scoped_session, sessionmaker -from sqlalchemy.orm.session import Session import voluptuous as vol -from homeassistant.components import persistent_notification -from homeassistant.const import ( - ATTR_ENTITY_ID, - CONF_EXCLUDE, - EVENT_HOMEASSISTANT_FINAL_WRITE, - EVENT_HOMEASSISTANT_STARTED, - EVENT_HOMEASSISTANT_STOP, - EVENT_STATE_CHANGED, - MATCH_ALL, -) -from homeassistant.core import ( - CALLBACK_TYPE, - CoreState, - Event, - HomeAssistant, - ServiceCall, - callback, -) +from homeassistant.const import CONF_EXCLUDE, EVENT_STATE_CHANGED +from homeassistant.core import HomeAssistant, ServiceCall, callback import homeassistant.helpers.config_validation as cv from homeassistant.helpers.entityfilter import ( INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA, INCLUDE_EXCLUDE_FILTER_SCHEMA_INNER, convert_include_exclude_filter, - generate_filter, -) -from homeassistant.helpers.event import ( - async_track_time_change, - async_track_time_interval, - async_track_utc_time_change, ) from homeassistant.helpers.integration_platform import ( async_process_integration_platforms, @@ -57,58 +20,29 @@ from homeassistant.helpers.integration_platform import ( from homeassistant.helpers.service import async_extract_entity_ids from homeassistant.helpers.typing import ConfigType from homeassistant.loader import bind_hass -import homeassistant.util.dt as dt_util -from . import history, migration, purge, statistics, websocket_api +from . import history, statistics, websocket_api from .const import ( + ATTR_APPLY_FILTER, + ATTR_KEEP_DAYS, + ATTR_REPACK, CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, - DB_WORKER_PREFIX, DOMAIN, - MAX_QUEUE_BACKLOG, + EXCLUDE_ATTRIBUTES, SQLITE_URL_PREFIX, ) -from .executor import DBInterruptibleThreadPoolExecutor -from .models import ( - Base, - EventData, - Events, - StateAttributes, - States, - StatisticData, - StatisticMetaData, - StatisticsRuns, - process_timestamp, -) -from .pool import POOL_SIZE, MutexPool, RecorderPool -from .queries import find_shared_attributes_id, find_shared_data_id -from .run_history import RunHistory -from .util import ( - dburl_to_path, - end_incomplete_runs, - is_second_sunday, - move_away_broken_database, - periodic_db_cleanups, - session_scope, - setup_connection_for_dialect, - validate_or_move_away_sqlite_database, - write_lock_db_sqlite, -) +from .core import Recorder +from .tasks import AddRecorderPlatformTask _LOGGER = logging.getLogger(__name__) -T = TypeVar("T") - -EXCLUDE_ATTRIBUTES = f"{DOMAIN}_exclude_attributes_by_domain" SERVICE_PURGE = "purge" SERVICE_PURGE_ENTITIES = "purge_entities" SERVICE_ENABLE = "enable" SERVICE_DISABLE = "disable" -ATTR_KEEP_DAYS = "keep_days" -ATTR_REPACK = "repack" -ATTR_APPLY_FILTER = "apply_filter" SERVICE_PURGE_SCHEMA = vol.Schema( { @@ -138,26 +72,6 @@ DEFAULT_DB_INTEGRITY_CHECK = True DEFAULT_DB_MAX_RETRIES = 10 DEFAULT_DB_RETRY_WAIT = 3 DEFAULT_COMMIT_INTERVAL = 1 -KEEPALIVE_TIME = 30 - -# Controls how often we clean up -# States and Events objects -EXPIRE_AFTER_COMMITS = 120 - -# The number of attribute ids to cache in memory -# -# Based on: -# - The number of overlapping attributes -# - How frequently states with overlapping attributes will change -# - How much memory our low end hardware has -STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048 -EVENT_DATA_ID_CACHE_SIZE = 2048 - -SHUTDOWN_TASK = object() - - -DB_LOCK_TIMEOUT = 30 -DB_LOCK_QUEUE_CHECK_TIMEOUT = 1 CONF_AUTO_PURGE = "auto_purge" CONF_AUTO_REPACK = "auto_repack" @@ -169,8 +83,6 @@ CONF_PURGE_INTERVAL = "purge_interval" CONF_EVENT_TYPES = "event_types" CONF_COMMIT_INTERVAL = "commit_interval" -INVALIDATED_ERR = "Database connection invalidated" -CONNECTIVITY_ERR = "Error in database connectivity during commit" EXCLUDE_SCHEMA = INCLUDE_EXCLUDE_FILTER_SCHEMA_INNER.extend( {vol.Optional(CONF_EVENT_TYPES): vol.All(cv.ensure_list, [cv.string])} @@ -227,10 +139,6 @@ CONFIG_SCHEMA = vol.Schema( ) -# Pool size must accommodate Recorder thread + All db executors -MAX_DB_EXECUTOR_WORKERS = POOL_SIZE - 1 - - def get_instance(hass: HomeAssistant) -> Recorder: """Get the recorder instance.""" instance: Recorder = hass.data[DATA_INSTANCE] @@ -351,1195 +259,3 @@ def _async_register_services(hass: HomeAssistant, instance: Recorder) -> None: async_handle_disable_service, schema=SERVICE_DISABLE_SCHEMA, ) - - -class RecorderTask(abc.ABC): - """ABC for recorder tasks.""" - - commit_before = True - - @abc.abstractmethod - def run(self, instance: Recorder) -> None: - """Handle the task.""" - - -@dataclass -class ClearStatisticsTask(RecorderTask): - """Object to store statistics_ids which for which to remove statistics.""" - - statistic_ids: list[str] - - def run(self, instance: Recorder) -> None: - """Handle the task.""" - statistics.clear_statistics(instance, self.statistic_ids) - - -@dataclass -class UpdateStatisticsMetadataTask(RecorderTask): - """Object to store statistics_id and unit for update of statistics metadata.""" - - statistic_id: str - unit_of_measurement: str | None - - def run(self, instance: Recorder) -> None: - """Handle the task.""" - statistics.update_statistics_metadata( - instance, self.statistic_id, self.unit_of_measurement - ) - - -@dataclass -class PurgeTask(RecorderTask): - """Object to store information about purge task.""" - - purge_before: datetime - repack: bool - apply_filter: bool - - def run(self, instance: Recorder) -> None: - """Purge the database.""" - assert instance.get_session is not None - - if purge.purge_old_data( - instance, self.purge_before, self.repack, self.apply_filter - ): - with instance.get_session() as session: - instance.run_history.load_from_db(session) - # We always need to do the db cleanups after a purge - # is finished to ensure the WAL checkpoint and other - # tasks happen after a vacuum. - periodic_db_cleanups(instance) - return - # Schedule a new purge task if this one didn't finish - instance.queue.put(PurgeTask(self.purge_before, self.repack, self.apply_filter)) - - -@dataclass -class PurgeEntitiesTask(RecorderTask): - """Object to store entity information about purge task.""" - - entity_filter: Callable[[str], bool] - - def run(self, instance: Recorder) -> None: - """Purge entities from the database.""" - if purge.purge_entity_data(instance, self.entity_filter): - return - # Schedule a new purge task if this one didn't finish - instance.queue.put(PurgeEntitiesTask(self.entity_filter)) - - -@dataclass -class PerodicCleanupTask(RecorderTask): - """An object to insert into the recorder to trigger cleanup tasks when auto purge is disabled.""" - - def run(self, instance: Recorder) -> None: - """Handle the task.""" - periodic_db_cleanups(instance) - - -@dataclass -class StatisticsTask(RecorderTask): - """An object to insert into the recorder queue to run a statistics task.""" - - start: datetime - - def run(self, instance: Recorder) -> None: - """Run statistics task.""" - if statistics.compile_statistics(instance, self.start): - return - # Schedule a new statistics task if this one didn't finish - instance.queue.put(StatisticsTask(self.start)) - - -@dataclass -class ExternalStatisticsTask(RecorderTask): - """An object to insert into the recorder queue to run an external statistics task.""" - - metadata: StatisticMetaData - statistics: Iterable[StatisticData] - - def run(self, instance: Recorder) -> None: - """Run statistics task.""" - if statistics.add_external_statistics(instance, self.metadata, self.statistics): - return - # Schedule a new statistics task if this one didn't finish - instance.queue.put(ExternalStatisticsTask(self.metadata, self.statistics)) - - -@dataclass -class AdjustStatisticsTask(RecorderTask): - """An object to insert into the recorder queue to run an adjust statistics task.""" - - statistic_id: str - start_time: datetime - sum_adjustment: float - - def run(self, instance: Recorder) -> None: - """Run statistics task.""" - if statistics.adjust_statistics( - instance, - self.statistic_id, - self.start_time, - self.sum_adjustment, - ): - return - # Schedule a new adjust statistics task if this one didn't finish - instance.queue.put( - AdjustStatisticsTask( - self.statistic_id, self.start_time, self.sum_adjustment - ) - ) - - -@dataclass -class WaitTask(RecorderTask): - """An object to insert into the recorder queue to tell it set the _queue_watch event.""" - - commit_before = False - - def run(self, instance: Recorder) -> None: - """Handle the task.""" - instance._queue_watch.set() # pylint: disable=[protected-access] - - -@dataclass -class DatabaseLockTask(RecorderTask): - """An object to insert into the recorder queue to prevent writes to the database.""" - - database_locked: asyncio.Event - database_unlock: threading.Event - queue_overflow: bool - - def run(self, instance: Recorder) -> None: - """Handle the task.""" - instance._lock_database(self) # pylint: disable=[protected-access] - - -@dataclass -class StopTask(RecorderTask): - """An object to insert into the recorder queue to stop the event handler.""" - - commit_before = False - - def run(self, instance: Recorder) -> None: - """Handle the task.""" - instance.stop_requested = True - - -@dataclass -class EventTask(RecorderTask): - """An event to be processed.""" - - event: Event - commit_before = False - - def run(self, instance: Recorder) -> None: - """Handle the task.""" - # pylint: disable-next=[protected-access] - instance._process_one_event(self.event) - - -@dataclass -class KeepAliveTask(RecorderTask): - """A keep alive to be sent.""" - - commit_before = False - - def run(self, instance: Recorder) -> None: - """Handle the task.""" - # pylint: disable-next=[protected-access] - instance._send_keep_alive() - - -@dataclass -class CommitTask(RecorderTask): - """Commit the event session.""" - - commit_before = False - - def run(self, instance: Recorder) -> None: - """Handle the task.""" - # pylint: disable-next=[protected-access] - instance._commit_event_session_or_retry() - - -@dataclass -class AddRecorderPlatformTask(RecorderTask): - """Add a recorder platform.""" - - domain: str - platform: Any - commit_before = False - - def run(self, instance: Recorder) -> None: - """Handle the task.""" - hass = instance.hass - domain = self.domain - platform = self.platform - - platforms: dict[str, Any] = hass.data[DOMAIN] - platforms[domain] = platform - if hasattr(self.platform, "exclude_attributes"): - hass.data[EXCLUDE_ATTRIBUTES][domain] = platform.exclude_attributes(hass) - - -COMMIT_TASK = CommitTask() -KEEP_ALIVE_TASK = KeepAliveTask() - - -class Recorder(threading.Thread): - """A threaded recorder class.""" - - stop_requested: bool - - def __init__( - self, - hass: HomeAssistant, - auto_purge: bool, - auto_repack: bool, - keep_days: int, - commit_interval: int, - uri: str, - db_max_retries: int, - db_retry_wait: int, - entity_filter: Callable[[str], bool], - exclude_t: list[str], - exclude_attributes_by_domain: dict[str, set[str]], - ) -> None: - """Initialize the recorder.""" - threading.Thread.__init__(self, name="Recorder") - - self.hass = hass - self.auto_purge = auto_purge - self.auto_repack = auto_repack - self.keep_days = keep_days - self._hass_started: asyncio.Future[object] = asyncio.Future() - self.commit_interval = commit_interval - self.queue: queue.SimpleQueue[RecorderTask] = queue.SimpleQueue() - self.db_url = uri - self.db_max_retries = db_max_retries - self.db_retry_wait = db_retry_wait - self.async_db_ready: asyncio.Future[bool] = asyncio.Future() - self.async_recorder_ready = asyncio.Event() - self._queue_watch = threading.Event() - self.engine: Engine | None = None - self.run_history = RunHistory() - - self.entity_filter = entity_filter - self.exclude_t = exclude_t - - self._commits_without_expire = 0 - self._old_states: dict[str, States] = {} - self._state_attributes_ids: LRU = LRU(STATE_ATTRIBUTES_ID_CACHE_SIZE) - self._event_data_ids: LRU = LRU(EVENT_DATA_ID_CACHE_SIZE) - self._pending_state_attributes: dict[str, StateAttributes] = {} - self._pending_event_data: dict[str, EventData] = {} - self._pending_expunge: list[States] = [] - self.event_session: Session | None = None - self.get_session: Callable[[], Session] | None = None - self._completed_first_database_setup: bool | None = None - self._event_listener: CALLBACK_TYPE | None = None - self.async_migration_event = asyncio.Event() - self.migration_in_progress = False - self._queue_watcher: CALLBACK_TYPE | None = None - self._db_supports_row_number = True - self._database_lock_task: DatabaseLockTask | None = None - self._db_executor: DBInterruptibleThreadPoolExecutor | None = None - self._exclude_attributes_by_domain = exclude_attributes_by_domain - - self._keep_alive_listener: CALLBACK_TYPE | None = None - self._commit_listener: CALLBACK_TYPE | None = None - self._periodic_listener: CALLBACK_TYPE | None = None - self._nightly_listener: CALLBACK_TYPE | None = None - self.enabled = True - - def set_enable(self, enable: bool) -> None: - """Enable or disable recording events and states.""" - self.enabled = enable - - @callback - def async_start_executor(self) -> None: - """Start the executor.""" - self._db_executor = DBInterruptibleThreadPoolExecutor( - thread_name_prefix=DB_WORKER_PREFIX, - max_workers=MAX_DB_EXECUTOR_WORKERS, - shutdown_hook=self._shutdown_pool, - ) - - def _shutdown_pool(self) -> None: - """Close the dbpool connections in the current thread.""" - if self.engine and hasattr(self.engine.pool, "shutdown"): - self.engine.pool.shutdown() - - @callback - def async_initialize(self) -> None: - """Initialize the recorder.""" - self._event_listener = self.hass.bus.async_listen( - MATCH_ALL, self.event_listener, event_filter=self._async_event_filter - ) - self._queue_watcher = async_track_time_interval( - self.hass, self._async_check_queue, timedelta(minutes=10) - ) - - @callback - def _async_keep_alive(self, now: datetime) -> None: - """Queue a keep alive.""" - if self._event_listener: - self.queue.put(KEEP_ALIVE_TASK) - - @callback - def _async_commit(self, now: datetime) -> None: - """Queue a commit.""" - if ( - self._event_listener - and not self._database_lock_task - and self._event_session_has_pending_writes() - ): - self.queue.put(COMMIT_TASK) - - @callback - def async_add_executor_job( - self, target: Callable[..., T], *args: Any - ) -> asyncio.Future[T]: - """Add an executor job from within the event loop.""" - return self.hass.loop.run_in_executor(self._db_executor, target, *args) - - def _stop_executor(self) -> None: - """Stop the executor.""" - assert self._db_executor is not None - self._db_executor.shutdown() - self._db_executor = None - - @callback - def _async_check_queue(self, *_: Any) -> None: - """Periodic check of the queue size to ensure we do not exaust memory. - - The queue grows during migraton or if something really goes wrong. - """ - size = self.queue.qsize() - _LOGGER.debug("Recorder queue size is: %s", size) - if size <= MAX_QUEUE_BACKLOG: - return - _LOGGER.error( - "The recorder backlog queue reached the maximum size of %s events; " - "usually, the system is CPU bound, I/O bound, or the database " - "is corrupt due to a disk problem; The recorder will stop " - "recording events to avoid running out of memory", - MAX_QUEUE_BACKLOG, - ) - self._async_stop_queue_watcher_and_event_listener() - - @callback - def _async_stop_queue_watcher_and_event_listener(self) -> None: - """Stop watching the queue and listening for events.""" - if self._queue_watcher: - self._queue_watcher() - self._queue_watcher = None - if self._event_listener: - self._event_listener() - self._event_listener = None - - @callback - def _async_stop_listeners(self) -> None: - """Stop listeners.""" - self._async_stop_queue_watcher_and_event_listener() - if self._keep_alive_listener: - self._keep_alive_listener() - self._keep_alive_listener = None - if self._commit_listener: - self._commit_listener() - self._commit_listener = None - if self._nightly_listener: - self._nightly_listener() - self._nightly_listener = None - if self._periodic_listener: - self._periodic_listener() - self._periodic_listener = None - - @callback - def _async_event_filter(self, event: Event) -> bool: - """Filter events.""" - if event.event_type in self.exclude_t: - return False - - if (entity_id := event.data.get(ATTR_ENTITY_ID)) is None: - return True - - if isinstance(entity_id, str): - return self.entity_filter(entity_id) - - if isinstance(entity_id, list): - for eid in entity_id: - if self.entity_filter(eid): - return True - return False - - # Unknown what it is. - return True - - def do_adhoc_purge(self, **kwargs: Any) -> None: - """Trigger an adhoc purge retaining keep_days worth of data.""" - keep_days = kwargs.get(ATTR_KEEP_DAYS, self.keep_days) - repack = cast(bool, kwargs[ATTR_REPACK]) - apply_filter = cast(bool, kwargs[ATTR_APPLY_FILTER]) - - purge_before = dt_util.utcnow() - timedelta(days=keep_days) - self.queue.put(PurgeTask(purge_before, repack, apply_filter)) - - def do_adhoc_purge_entities( - self, entity_ids: set[str], domains: list[str], entity_globs: list[str] - ) -> None: - """Trigger an adhoc purge of requested entities.""" - entity_filter = generate_filter(domains, list(entity_ids), [], [], entity_globs) - self.queue.put(PurgeEntitiesTask(entity_filter)) - - def do_adhoc_statistics(self, **kwargs: Any) -> None: - """Trigger an adhoc statistics run.""" - if not (start := kwargs.get("start")): - start = statistics.get_start_time() - self.queue.put(StatisticsTask(start)) - - @callback - def async_register(self) -> None: - """Post connection initialize.""" - - def _empty_queue(event: Event) -> None: - """Empty the queue if its still present at final write.""" - - # If the queue is full of events to be processed because - # the database is so broken that every event results in a retry - # we will never be able to get though the events to shutdown in time. - # - # We drain all the events in the queue and then insert - # an empty one to ensure the next thing the recorder sees - # is a request to shutdown. - while True: - try: - self.queue.get_nowait() - except queue.Empty: - break - self.queue.put(StopTask()) - - self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, _empty_queue) - - async def _async_shutdown(event: Event) -> None: - """Shut down the Recorder.""" - if not self._hass_started.done(): - self._hass_started.set_result(SHUTDOWN_TASK) - self.queue.put(StopTask()) - self._async_stop_listeners() - await self.hass.async_add_executor_job(self.join) - - self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_shutdown) - - if self.hass.state == CoreState.running: - self._hass_started.set_result(None) - return - - @callback - def _async_hass_started(event: Event) -> None: - """Notify that hass has started.""" - self._hass_started.set_result(None) - - self.hass.bus.async_listen_once( - EVENT_HOMEASSISTANT_STARTED, _async_hass_started - ) - - @callback - def async_connection_failed(self) -> None: - """Connect failed tasks.""" - self.async_db_ready.set_result(False) - persistent_notification.async_create( - self.hass, - "The recorder could not start, check [the logs](/config/logs)", - "Recorder", - ) - self._async_stop_listeners() - - @callback - def async_connection_success(self) -> None: - """Connect success tasks.""" - self.async_db_ready.set_result(True) - self.async_start_executor() - - @callback - def _async_recorder_ready(self) -> None: - """Finish start and mark recorder ready.""" - self._async_setup_periodic_tasks() - self.async_recorder_ready.set() - - @callback - def async_nightly_tasks(self, now: datetime) -> None: - """Trigger the purge.""" - if self.auto_purge: - # Purge will schedule the periodic cleanups - # after it completes to ensure it does not happen - # until after the database is vacuumed - repack = self.auto_repack and is_second_sunday(now) - purge_before = dt_util.utcnow() - timedelta(days=self.keep_days) - self.queue.put(PurgeTask(purge_before, repack=repack, apply_filter=False)) - else: - self.queue.put(PerodicCleanupTask()) - - @callback - def async_periodic_statistics(self, now: datetime) -> None: - """Trigger the statistics run. - - Short term statistics run every 5 minutes - """ - start = statistics.get_start_time() - self.queue.put(StatisticsTask(start)) - - @callback - def async_adjust_statistics( - self, statistic_id: str, start_time: datetime, sum_adjustment: float - ) -> None: - """Adjust statistics.""" - self.queue.put(AdjustStatisticsTask(statistic_id, start_time, sum_adjustment)) - - @callback - def async_clear_statistics(self, statistic_ids: list[str]) -> None: - """Clear statistics for a list of statistic_ids.""" - self.queue.put(ClearStatisticsTask(statistic_ids)) - - @callback - def async_update_statistics_metadata( - self, statistic_id: str, unit_of_measurement: str | None - ) -> None: - """Update statistics metadata for a statistic_id.""" - self.queue.put(UpdateStatisticsMetadataTask(statistic_id, unit_of_measurement)) - - @callback - def async_external_statistics( - self, metadata: StatisticMetaData, stats: Iterable[StatisticData] - ) -> None: - """Schedule external statistics.""" - self.queue.put(ExternalStatisticsTask(metadata, stats)) - - @callback - def using_sqlite(self) -> bool: - """Return if recorder uses sqlite as the engine.""" - return bool(self.engine and self.engine.dialect.name == "sqlite") - - @callback - def _async_setup_periodic_tasks(self) -> None: - """Prepare periodic tasks.""" - if self.hass.is_stopping or not self.get_session: - # Home Assistant is shutting down - return - - # If the db is using a socket connection, we need to keep alive - # to prevent errors from unexpected disconnects - if not self.using_sqlite(): - self._keep_alive_listener = async_track_time_interval( - self.hass, self._async_keep_alive, timedelta(seconds=KEEPALIVE_TIME) - ) - - # If the commit interval is not 0, we need to commit periodically - if self.commit_interval: - self._commit_listener = async_track_time_interval( - self.hass, self._async_commit, timedelta(seconds=self.commit_interval) - ) - - # Run nightly tasks at 4:12am - self._nightly_listener = async_track_time_change( - self.hass, self.async_nightly_tasks, hour=4, minute=12, second=0 - ) - - # Compile short term statistics every 5 minutes - self._periodic_listener = async_track_utc_time_change( - self.hass, self.async_periodic_statistics, minute=range(0, 60, 5), second=10 - ) - - async def _async_wait_for_started(self) -> object | None: - """Wait for the hass started future.""" - return await self._hass_started - - def _wait_startup_or_shutdown(self) -> object | None: - """Wait for startup or shutdown before starting.""" - return asyncio.run_coroutine_threadsafe( - self._async_wait_for_started(), self.hass.loop - ).result() - - def run(self) -> None: - """Start processing events to save.""" - current_version = self._setup_recorder() - - if current_version is None: - self.hass.add_job(self.async_connection_failed) - return - - schema_is_current = migration.schema_is_current(current_version) - if schema_is_current: - self._setup_run() - else: - self.migration_in_progress = True - - self.hass.add_job(self.async_connection_success) - - # If shutdown happened before Home Assistant finished starting - if self._wait_startup_or_shutdown() is SHUTDOWN_TASK: - self.migration_in_progress = False - # Make sure we cleanly close the run if - # we restart before startup finishes - self._shutdown() - return - - # We wait to start the migration until startup has finished - # since it can be cpu intensive and we do not want it to compete - # with startup which is also cpu intensive - if not schema_is_current: - if self._migrate_schema_and_setup_run(current_version): - if not self._event_listener: - # If the schema migration takes so long that the end - # queue watcher safety kicks in because MAX_QUEUE_BACKLOG - # is reached, we need to reinitialize the listener. - self.hass.add_job(self.async_initialize) - else: - persistent_notification.create( - self.hass, - "The database migration failed, check [the logs](/config/logs)." - "Database Migration Failed", - "recorder_database_migration", - ) - self._shutdown() - return - - _LOGGER.debug("Recorder processing the queue") - self.hass.add_job(self._async_recorder_ready) - self._run_event_loop() - - def _run_event_loop(self) -> None: - """Run the event loop for the recorder.""" - # Use a session for the event read loop - # with a commit every time the event time - # has changed. This reduces the disk io. - self.stop_requested = False - while not self.stop_requested: - task = self.queue.get() - _LOGGER.debug("Processing task: %s", task) - try: - self._process_one_task_or_recover(task) - except Exception as err: # pylint: disable=broad-except - _LOGGER.exception("Error while processing event %s: %s", task, err) - - self._shutdown() - - def _process_one_task_or_recover(self, task: RecorderTask) -> None: - """Process an event, reconnect, or recover a malformed database.""" - try: - # If its not an event, commit everything - # that is pending before running the task - if task.commit_before: - self._commit_event_session_or_retry() - return task.run(self) - except exc.DatabaseError as err: - if self._handle_database_error(err): - return - _LOGGER.exception( - "Unhandled database error while processing task %s: %s", task, err - ) - except SQLAlchemyError as err: - _LOGGER.exception("SQLAlchemyError error processing task %s: %s", task, err) - - # Reset the session if an SQLAlchemyError (including DatabaseError) - # happens to rollback and recover - self._reopen_event_session() - - def _setup_recorder(self) -> None | int: - """Create connect to the database and get the schema version.""" - tries = 1 - - while tries <= self.db_max_retries: - try: - self._setup_connection() - return migration.get_schema_version(self) - except Exception as err: # pylint: disable=broad-except - _LOGGER.exception( - "Error during connection setup: %s (retrying in %s seconds)", - err, - self.db_retry_wait, - ) - tries += 1 - time.sleep(self.db_retry_wait) - - return None - - @callback - def _async_migration_started(self) -> None: - """Set the migration started event.""" - self.async_migration_event.set() - - def _migrate_schema_and_setup_run(self, current_version: int) -> bool: - """Migrate schema to the latest version.""" - persistent_notification.create( - self.hass, - "System performance will temporarily degrade during the database upgrade. Do not power down or restart the system until the upgrade completes. Integrations that read the database, such as logbook and history, may return inconsistent results until the upgrade completes.", - "Database upgrade in progress", - "recorder_database_migration", - ) - self.hass.add_job(self._async_migration_started) - - try: - migration.migrate_schema(self, current_version) - except exc.DatabaseError as err: - if self._handle_database_error(err): - return True - _LOGGER.exception("Database error during schema migration") - return False - except Exception: # pylint: disable=broad-except - _LOGGER.exception("Error during schema migration") - return False - else: - self._setup_run() - return True - finally: - self.migration_in_progress = False - persistent_notification.dismiss(self.hass, "recorder_database_migration") - - def _lock_database(self, task: DatabaseLockTask) -> None: - @callback - def _async_set_database_locked(task: DatabaseLockTask) -> None: - task.database_locked.set() - - with write_lock_db_sqlite(self): - # Notify that lock is being held, wait until database can be used again. - self.hass.add_job(_async_set_database_locked, task) - while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT): - if self.queue.qsize() > MAX_QUEUE_BACKLOG * 0.9: - _LOGGER.warning( - "Database queue backlog reached more than 90% of maximum queue " - "length while waiting for backup to finish; recorder will now " - "resume writing to database. The backup can not be trusted and " - "must be restarted" - ) - task.queue_overflow = True - break - _LOGGER.info( - "Database queue backlog reached %d entries during backup", - self.queue.qsize(), - ) - - def _process_one_event(self, event: Event) -> None: - if not self.enabled: - return - if event.event_type == EVENT_STATE_CHANGED: - self._process_state_changed_event_into_session(event) - else: - self._process_non_state_changed_event_into_session(event) - # Commit if the commit interval is zero - if not self.commit_interval: - self._commit_event_session_or_retry() - - def _find_shared_attr_in_db(self, attr_hash: int, shared_attrs: str) -> int | None: - """Find shared attributes in the db from the hash and shared_attrs.""" - # - # Avoid the event session being flushed since it will - # commit all the pending events and states to the database. - # - # The lookup has already have checked to see if the data is cached - # or going to be written in the next commit so there is no - # need to flush before checking the database. - # - assert self.event_session is not None - with self.event_session.no_autoflush: - if attributes_id := self.event_session.execute( - find_shared_attributes_id(attr_hash, shared_attrs) - ).first(): - return cast(int, attributes_id[0]) - return None - - def _find_shared_data_in_db(self, data_hash: int, shared_data: str) -> int | None: - """Find shared event data in the db from the hash and shared_attrs.""" - # - # Avoid the event session being flushed since it will - # commit all the pending events and states to the database. - # - # The lookup has already have checked to see if the data is cached - # or going to be written in the next commit so there is no - # need to flush before checking the database. - # - assert self.event_session is not None - with self.event_session.no_autoflush: - if data_id := self.event_session.execute( - find_shared_data_id(data_hash, shared_data) - ).first(): - return cast(int, data_id[0]) - return None - - def _process_non_state_changed_event_into_session(self, event: Event) -> None: - """Process any event into the session except state changed.""" - assert self.event_session is not None - dbevent = Events.from_event(event) - if not event.data: - self.event_session.add(dbevent) - return - - try: - shared_data = EventData.shared_data_from_event(event) - except (TypeError, ValueError) as ex: - _LOGGER.warning("Event is not JSON serializable: %s: %s", event, ex) - return - - # Matching attributes found in the pending commit - if pending_event_data := self._pending_event_data.get(shared_data): - dbevent.event_data_rel = pending_event_data - # Matching attributes id found in the cache - elif data_id := self._event_data_ids.get(shared_data): - dbevent.data_id = data_id - else: - data_hash = EventData.hash_shared_data(shared_data) - # Matching attributes found in the database - if data_id := self._find_shared_data_in_db(data_hash, shared_data): - self._event_data_ids[shared_data] = dbevent.data_id = data_id - # No matching attributes found, save them in the DB - else: - dbevent_data = EventData(shared_data=shared_data, hash=data_hash) - dbevent.event_data_rel = self._pending_event_data[ - shared_data - ] = dbevent_data - self.event_session.add(dbevent_data) - - self.event_session.add(dbevent) - - def _process_state_changed_event_into_session(self, event: Event) -> None: - """Process a state_changed event into the session.""" - assert self.event_session is not None - try: - dbstate = States.from_event(event) - shared_attrs = StateAttributes.shared_attrs_from_event( - event, self._exclude_attributes_by_domain - ) - except (TypeError, ValueError) as ex: - _LOGGER.warning( - "State is not JSON serializable: %s: %s", - event.data.get("new_state"), - ex, - ) - return - - dbstate.attributes = None - # Matching attributes found in the pending commit - if pending_attributes := self._pending_state_attributes.get(shared_attrs): - dbstate.state_attributes = pending_attributes - # Matching attributes id found in the cache - elif attributes_id := self._state_attributes_ids.get(shared_attrs): - dbstate.attributes_id = attributes_id - else: - attr_hash = StateAttributes.hash_shared_attrs(shared_attrs) - # Matching attributes found in the database - if attributes_id := self._find_shared_attr_in_db(attr_hash, shared_attrs): - dbstate.attributes_id = attributes_id - self._state_attributes_ids[shared_attrs] = attributes_id - # No matching attributes found, save them in the DB - else: - dbstate_attributes = StateAttributes( - shared_attrs=shared_attrs, hash=attr_hash - ) - dbstate.state_attributes = dbstate_attributes - self._pending_state_attributes[shared_attrs] = dbstate_attributes - self.event_session.add(dbstate_attributes) - - if old_state := self._old_states.pop(dbstate.entity_id, None): - if old_state.state_id: - dbstate.old_state_id = old_state.state_id - else: - dbstate.old_state = old_state - if event.data.get("new_state"): - self._old_states[dbstate.entity_id] = dbstate - self._pending_expunge.append(dbstate) - else: - dbstate.state = None - self.event_session.add(dbstate) - - def _handle_database_error(self, err: Exception) -> bool: - """Handle a database error that may result in moving away the corrupt db.""" - if isinstance(err.__cause__, sqlite3.DatabaseError): - _LOGGER.exception( - "Unrecoverable sqlite3 database corruption detected: %s", err - ) - self._handle_sqlite_corruption() - return True - return False - - def _event_session_has_pending_writes(self) -> bool: - return bool( - self.event_session and (self.event_session.new or self.event_session.dirty) - ) - - def _commit_event_session_or_retry(self) -> None: - """Commit the event session if there is work to do.""" - if not self._event_session_has_pending_writes(): - return - tries = 1 - while tries <= self.db_max_retries: - try: - self._commit_event_session() - return - except (exc.InternalError, exc.OperationalError) as err: - _LOGGER.error( - "%s: Error executing query: %s. (retrying in %s seconds)", - INVALIDATED_ERR if err.connection_invalidated else CONNECTIVITY_ERR, - err, - self.db_retry_wait, - ) - if tries == self.db_max_retries: - raise - - tries += 1 - time.sleep(self.db_retry_wait) - - def _commit_event_session(self) -> None: - assert self.event_session is not None - self._commits_without_expire += 1 - - if self._pending_expunge: - self.event_session.flush() - for dbstate in self._pending_expunge: - # Expunge the state so its not expired - # until we use it later for dbstate.old_state - if dbstate in self.event_session: - self.event_session.expunge(dbstate) - self._pending_expunge = [] - self.event_session.commit() - - # We just committed the state attributes to the database - # and we now know the attributes_ids. We can save - # many selects for matching attributes by loading them - # into the LRU cache now. - for state_attr in self._pending_state_attributes.values(): - self._state_attributes_ids[ - state_attr.shared_attrs - ] = state_attr.attributes_id - self._pending_state_attributes = {} - for event_data in self._pending_event_data.values(): - self._event_data_ids[event_data.shared_data] = event_data.data_id - self._pending_event_data = {} - - # Expire is an expensive operation (frequently more expensive - # than the flush and commit itself) so we only - # do it after EXPIRE_AFTER_COMMITS commits - if self._commits_without_expire >= EXPIRE_AFTER_COMMITS: - self._commits_without_expire = 0 - self.event_session.expire_all() - - def _handle_sqlite_corruption(self) -> None: - """Handle the sqlite3 database being corrupt.""" - self._close_event_session() - self._close_connection() - move_away_broken_database(dburl_to_path(self.db_url)) - self.run_history.reset() - self._setup_recorder() - self._setup_run() - - def _close_event_session(self) -> None: - """Close the event session.""" - self._old_states = {} - self._state_attributes_ids = {} - self._event_data_ids = {} - self._pending_state_attributes = {} - self._pending_event_data = {} - - if not self.event_session: - return - - try: - self.event_session.rollback() - self.event_session.close() - except SQLAlchemyError as err: - _LOGGER.exception( - "Error while rolling back and closing the event session: %s", err - ) - - def _reopen_event_session(self) -> None: - """Rollback the event session and reopen it after a failure.""" - self._close_event_session() - self._open_event_session() - - def _open_event_session(self) -> None: - """Open the event session.""" - assert self.get_session is not None - self.event_session = self.get_session() - self.event_session.expire_on_commit = False - - def _send_keep_alive(self) -> None: - """Send a keep alive to keep the db connection open.""" - assert self.event_session is not None - _LOGGER.debug("Sending keepalive") - self.event_session.connection().scalar(select([1])) - - @callback - def event_listener(self, event: Event) -> None: - """Listen for new events and put them in the process queue.""" - self.queue.put(EventTask(event)) - - def block_till_done(self) -> None: - """Block till all events processed. - - This is only called in tests. - - This only blocks until the queue is empty - which does not mean the recorder is done. - - Call tests.common's wait_recording_done - after calling this to ensure the data - is in the database. - """ - self._queue_watch.clear() - self.queue.put(WaitTask()) - self._queue_watch.wait() - - async def lock_database(self) -> bool: - """Lock database so it can be backed up safely.""" - if not self.using_sqlite(): - _LOGGER.debug( - "Not a SQLite database or not connected, locking not necessary" - ) - return True - - if self._database_lock_task: - _LOGGER.warning("Database already locked") - return False - - database_locked = asyncio.Event() - task = DatabaseLockTask(database_locked, threading.Event(), False) - self.queue.put(task) - try: - await asyncio.wait_for(database_locked.wait(), timeout=DB_LOCK_TIMEOUT) - except asyncio.TimeoutError as err: - task.database_unlock.set() - raise TimeoutError( - f"Could not lock database within {DB_LOCK_TIMEOUT} seconds." - ) from err - self._database_lock_task = task - return True - - @callback - def unlock_database(self) -> bool: - """Unlock database. - - Returns true if database lock has been held throughout the process. - """ - if not self.using_sqlite(): - _LOGGER.debug( - "Not a SQLite database or not connected, unlocking not necessary" - ) - return True - - if not self._database_lock_task: - _LOGGER.warning("Database currently not locked") - return False - - self._database_lock_task.database_unlock.set() - success = not self._database_lock_task.queue_overflow - - self._database_lock_task = None - - return success - - def _setup_connection(self) -> None: - """Ensure database is ready to fly.""" - kwargs: dict[str, Any] = {} - self._completed_first_database_setup = False - - def setup_recorder_connection( - dbapi_connection: Any, connection_record: Any - ) -> None: - """Dbapi specific connection settings.""" - assert self.engine is not None - setup_connection_for_dialect( - self, - self.engine.dialect.name, - dbapi_connection, - not self._completed_first_database_setup, - ) - self._completed_first_database_setup = True - - if self.db_url == SQLITE_URL_PREFIX or ":memory:" in self.db_url: - kwargs["connect_args"] = {"check_same_thread": False} - kwargs["poolclass"] = MutexPool - MutexPool.pool_lock = threading.RLock() - kwargs["pool_reset_on_return"] = None - elif self.db_url.startswith(SQLITE_URL_PREFIX): - kwargs["poolclass"] = RecorderPool - else: - kwargs["echo"] = False - - if self._using_file_sqlite: - validate_or_move_away_sqlite_database(self.db_url) - - self.engine = create_engine(self.db_url, **kwargs, future=True) - - sqlalchemy_event.listen(self.engine, "connect", setup_recorder_connection) - - Base.metadata.create_all(self.engine) - self.get_session = scoped_session(sessionmaker(bind=self.engine, future=True)) - _LOGGER.debug("Connected to recorder database") - - @property - def _using_file_sqlite(self) -> bool: - """Short version to check if we are using sqlite3 as a file.""" - return self.db_url != SQLITE_URL_PREFIX and self.db_url.startswith( - SQLITE_URL_PREFIX - ) - - def _close_connection(self) -> None: - """Close the connection.""" - assert self.engine is not None - self.engine.dispose() - self.engine = None - self.get_session = None - - def _setup_run(self) -> None: - """Log the start of the current run and schedule any needed jobs.""" - assert self.get_session is not None - with session_scope(session=self.get_session()) as session: - end_incomplete_runs(session, self.run_history.recording_start) - self.run_history.start(session) - self._schedule_compile_missing_statistics(session) - - self._open_event_session() - - def _schedule_compile_missing_statistics(self, session: Session) -> None: - """Add tasks for missing statistics runs.""" - now = dt_util.utcnow() - last_period_minutes = now.minute - now.minute % 5 - last_period = now.replace(minute=last_period_minutes, second=0, microsecond=0) - start = now - timedelta(days=self.keep_days) - start = start.replace(minute=0, second=0, microsecond=0) - - # Find the newest statistics run, if any - if last_run := session.query(func.max(StatisticsRuns.start)).scalar(): - start = max(start, process_timestamp(last_run) + timedelta(minutes=5)) - - # Add tasks - while start < last_period: - end = start + timedelta(minutes=5) - _LOGGER.debug("Compiling missing statistics for %s-%s", start, end) - self.queue.put(StatisticsTask(start)) - start = end - - def _end_session(self) -> None: - """End the recorder session.""" - if self.event_session is None: - return - try: - self.run_history.end(self.event_session) - self._commit_event_session_or_retry() - self.event_session.close() - except Exception as err: # pylint: disable=broad-except - _LOGGER.exception("Error saving the event session during shutdown: %s", err) - - self.run_history.clear() - - def _shutdown(self) -> None: - """Save end time for current run.""" - self.hass.add_job(self._async_stop_listeners) - self._stop_executor() - self._end_session() - self._close_connection() - - @property - def recording(self) -> bool: - """Return if the recorder is recording.""" - return self._event_listener is not None diff --git a/homeassistant/components/recorder/const.py b/homeassistant/components/recorder/const.py index 593710a10dd..5b623ee5e04 100644 --- a/homeassistant/components/recorder/const.py +++ b/homeassistant/components/recorder/const.py @@ -28,3 +28,12 @@ DB_WORKER_PREFIX = "DbWorker" JSON_DUMP: Final = partial(json.dumps, cls=JSONEncoder, separators=(",", ":")) ALL_DOMAIN_EXCLUDE_ATTRS = {ATTR_ATTRIBUTION, ATTR_RESTORED, ATTR_SUPPORTED_FEATURES} + +ATTR_KEEP_DAYS = "keep_days" +ATTR_REPACK = "repack" +ATTR_APPLY_FILTER = "apply_filter" + +KEEPALIVE_TIME = 30 + + +EXCLUDE_ATTRIBUTES = f"{DOMAIN}_exclude_attributes_by_domain" diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py new file mode 100644 index 00000000000..b96eee2c67f --- /dev/null +++ b/homeassistant/components/recorder/core.py @@ -0,0 +1,1082 @@ +"""Support for recording details.""" +from __future__ import annotations + +import asyncio +from collections.abc import Callable, Iterable +from datetime import datetime, timedelta +import logging +import queue +import sqlite3 +import threading +import time +from typing import Any, TypeVar, cast + +from lru import LRU # pylint: disable=no-name-in-module +from sqlalchemy import create_engine, event as sqlalchemy_event, exc, func, select +from sqlalchemy.engine import Engine +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm import scoped_session, sessionmaker +from sqlalchemy.orm.session import Session + +from homeassistant.components import persistent_notification +from homeassistant.const import ( + ATTR_ENTITY_ID, + EVENT_HOMEASSISTANT_FINAL_WRITE, + EVENT_HOMEASSISTANT_STARTED, + EVENT_HOMEASSISTANT_STOP, + EVENT_STATE_CHANGED, + MATCH_ALL, +) +from homeassistant.core import CALLBACK_TYPE, CoreState, Event, HomeAssistant, callback +from homeassistant.helpers.entityfilter import generate_filter +from homeassistant.helpers.event import ( + async_track_time_change, + async_track_time_interval, + async_track_utc_time_change, +) +import homeassistant.util.dt as dt_util + +from . import migration, statistics +from .const import ( + ATTR_APPLY_FILTER, + ATTR_KEEP_DAYS, + ATTR_REPACK, + DB_WORKER_PREFIX, + KEEPALIVE_TIME, + MAX_QUEUE_BACKLOG, + SQLITE_URL_PREFIX, +) +from .executor import DBInterruptibleThreadPoolExecutor +from .models import ( + Base, + EventData, + Events, + StateAttributes, + States, + StatisticData, + StatisticMetaData, + StatisticsRuns, + process_timestamp, +) +from .pool import POOL_SIZE, MutexPool, RecorderPool +from .queries import find_shared_attributes_id, find_shared_data_id +from .run_history import RunHistory +from .tasks import ( + AdjustStatisticsTask, + ClearStatisticsTask, + CommitTask, + DatabaseLockTask, + EventTask, + ExternalStatisticsTask, + KeepAliveTask, + PerodicCleanupTask, + PurgeEntitiesTask, + PurgeTask, + RecorderTask, + StatisticsTask, + StopTask, + UpdateStatisticsMetadataTask, + WaitTask, +) +from .util import ( + dburl_to_path, + end_incomplete_runs, + is_second_sunday, + move_away_broken_database, + session_scope, + setup_connection_for_dialect, + validate_or_move_away_sqlite_database, + write_lock_db_sqlite, +) + +_LOGGER = logging.getLogger(__name__) + +T = TypeVar("T") + +DEFAULT_URL = "sqlite:///{hass_config_path}" + +# Controls how often we clean up +# States and Events objects +EXPIRE_AFTER_COMMITS = 120 + +# The number of attribute ids to cache in memory +# +# Based on: +# - The number of overlapping attributes +# - How frequently states with overlapping attributes will change +# - How much memory our low end hardware has +STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048 +EVENT_DATA_ID_CACHE_SIZE = 2048 + +SHUTDOWN_TASK = object() + +COMMIT_TASK = CommitTask() +KEEP_ALIVE_TASK = KeepAliveTask() + +DB_LOCK_TIMEOUT = 30 +DB_LOCK_QUEUE_CHECK_TIMEOUT = 1 + + +INVALIDATED_ERR = "Database connection invalidated" +CONNECTIVITY_ERR = "Error in database connectivity during commit" + +# Pool size must accommodate Recorder thread + All db executors +MAX_DB_EXECUTOR_WORKERS = POOL_SIZE - 1 + + +class Recorder(threading.Thread): + """A threaded recorder class.""" + + stop_requested: bool + + def __init__( + self, + hass: HomeAssistant, + auto_purge: bool, + auto_repack: bool, + keep_days: int, + commit_interval: int, + uri: str, + db_max_retries: int, + db_retry_wait: int, + entity_filter: Callable[[str], bool], + exclude_t: list[str], + exclude_attributes_by_domain: dict[str, set[str]], + ) -> None: + """Initialize the recorder.""" + threading.Thread.__init__(self, name="Recorder") + + self.hass = hass + self.auto_purge = auto_purge + self.auto_repack = auto_repack + self.keep_days = keep_days + self._hass_started: asyncio.Future[object] = asyncio.Future() + self.commit_interval = commit_interval + self.queue: queue.SimpleQueue[RecorderTask] = queue.SimpleQueue() + self.db_url = uri + self.db_max_retries = db_max_retries + self.db_retry_wait = db_retry_wait + self.async_db_ready: asyncio.Future[bool] = asyncio.Future() + self.async_recorder_ready = asyncio.Event() + self._queue_watch = threading.Event() + self.engine: Engine | None = None + self.run_history = RunHistory() + + self.entity_filter = entity_filter + self.exclude_t = exclude_t + + self._commits_without_expire = 0 + self._old_states: dict[str, States] = {} + self._state_attributes_ids: LRU = LRU(STATE_ATTRIBUTES_ID_CACHE_SIZE) + self._event_data_ids: LRU = LRU(EVENT_DATA_ID_CACHE_SIZE) + self._pending_state_attributes: dict[str, StateAttributes] = {} + self._pending_event_data: dict[str, EventData] = {} + self._pending_expunge: list[States] = [] + self.event_session: Session | None = None + self.get_session: Callable[[], Session] | None = None + self._completed_first_database_setup: bool | None = None + self._event_listener: CALLBACK_TYPE | None = None + self.async_migration_event = asyncio.Event() + self.migration_in_progress = False + self._queue_watcher: CALLBACK_TYPE | None = None + self._db_supports_row_number = True + self._database_lock_task: DatabaseLockTask | None = None + self._db_executor: DBInterruptibleThreadPoolExecutor | None = None + self._exclude_attributes_by_domain = exclude_attributes_by_domain + + self._keep_alive_listener: CALLBACK_TYPE | None = None + self._commit_listener: CALLBACK_TYPE | None = None + self._periodic_listener: CALLBACK_TYPE | None = None + self._nightly_listener: CALLBACK_TYPE | None = None + self.enabled = True + + def set_enable(self, enable: bool) -> None: + """Enable or disable recording events and states.""" + self.enabled = enable + + @callback + def async_start_executor(self) -> None: + """Start the executor.""" + self._db_executor = DBInterruptibleThreadPoolExecutor( + thread_name_prefix=DB_WORKER_PREFIX, + max_workers=MAX_DB_EXECUTOR_WORKERS, + shutdown_hook=self._shutdown_pool, + ) + + def _shutdown_pool(self) -> None: + """Close the dbpool connections in the current thread.""" + if self.engine and hasattr(self.engine.pool, "shutdown"): + self.engine.pool.shutdown() + + @callback + def async_initialize(self) -> None: + """Initialize the recorder.""" + self._event_listener = self.hass.bus.async_listen( + MATCH_ALL, self.event_listener, event_filter=self._async_event_filter + ) + self._queue_watcher = async_track_time_interval( + self.hass, self._async_check_queue, timedelta(minutes=10) + ) + + @callback + def _async_keep_alive(self, now: datetime) -> None: + """Queue a keep alive.""" + if self._event_listener: + self.queue.put(KEEP_ALIVE_TASK) + + @callback + def _async_commit(self, now: datetime) -> None: + """Queue a commit.""" + if ( + self._event_listener + and not self._database_lock_task + and self._event_session_has_pending_writes() + ): + self.queue.put(COMMIT_TASK) + + @callback + def async_add_executor_job( + self, target: Callable[..., T], *args: Any + ) -> asyncio.Future[T]: + """Add an executor job from within the event loop.""" + return self.hass.loop.run_in_executor(self._db_executor, target, *args) + + def _stop_executor(self) -> None: + """Stop the executor.""" + assert self._db_executor is not None + self._db_executor.shutdown() + self._db_executor = None + + @callback + def _async_check_queue(self, *_: Any) -> None: + """Periodic check of the queue size to ensure we do not exaust memory. + + The queue grows during migraton or if something really goes wrong. + """ + size = self.queue.qsize() + _LOGGER.debug("Recorder queue size is: %s", size) + if size <= MAX_QUEUE_BACKLOG: + return + _LOGGER.error( + "The recorder backlog queue reached the maximum size of %s events; " + "usually, the system is CPU bound, I/O bound, or the database " + "is corrupt due to a disk problem; The recorder will stop " + "recording events to avoid running out of memory", + MAX_QUEUE_BACKLOG, + ) + self._async_stop_queue_watcher_and_event_listener() + + @callback + def _async_stop_queue_watcher_and_event_listener(self) -> None: + """Stop watching the queue and listening for events.""" + if self._queue_watcher: + self._queue_watcher() + self._queue_watcher = None + if self._event_listener: + self._event_listener() + self._event_listener = None + + @callback + def _async_stop_listeners(self) -> None: + """Stop listeners.""" + self._async_stop_queue_watcher_and_event_listener() + if self._keep_alive_listener: + self._keep_alive_listener() + self._keep_alive_listener = None + if self._commit_listener: + self._commit_listener() + self._commit_listener = None + if self._nightly_listener: + self._nightly_listener() + self._nightly_listener = None + if self._periodic_listener: + self._periodic_listener() + self._periodic_listener = None + + @callback + def _async_event_filter(self, event: Event) -> bool: + """Filter events.""" + if event.event_type in self.exclude_t: + return False + + if (entity_id := event.data.get(ATTR_ENTITY_ID)) is None: + return True + + if isinstance(entity_id, str): + return self.entity_filter(entity_id) + + if isinstance(entity_id, list): + for eid in entity_id: + if self.entity_filter(eid): + return True + return False + + # Unknown what it is. + return True + + def do_adhoc_purge(self, **kwargs: Any) -> None: + """Trigger an adhoc purge retaining keep_days worth of data.""" + keep_days = kwargs.get(ATTR_KEEP_DAYS, self.keep_days) + repack = cast(bool, kwargs[ATTR_REPACK]) + apply_filter = cast(bool, kwargs[ATTR_APPLY_FILTER]) + + purge_before = dt_util.utcnow() - timedelta(days=keep_days) + self.queue.put(PurgeTask(purge_before, repack, apply_filter)) + + def do_adhoc_purge_entities( + self, entity_ids: set[str], domains: list[str], entity_globs: list[str] + ) -> None: + """Trigger an adhoc purge of requested entities.""" + entity_filter = generate_filter(domains, list(entity_ids), [], [], entity_globs) + self.queue.put(PurgeEntitiesTask(entity_filter)) + + def do_adhoc_statistics(self, **kwargs: Any) -> None: + """Trigger an adhoc statistics run.""" + if not (start := kwargs.get("start")): + start = statistics.get_start_time() + self.queue.put(StatisticsTask(start)) + + @callback + def async_register(self) -> None: + """Post connection initialize.""" + + def _empty_queue(event: Event) -> None: + """Empty the queue if its still present at final write.""" + + # If the queue is full of events to be processed because + # the database is so broken that every event results in a retry + # we will never be able to get though the events to shutdown in time. + # + # We drain all the events in the queue and then insert + # an empty one to ensure the next thing the recorder sees + # is a request to shutdown. + while True: + try: + self.queue.get_nowait() + except queue.Empty: + break + self.queue.put(StopTask()) + + self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, _empty_queue) + + async def _async_shutdown(event: Event) -> None: + """Shut down the Recorder.""" + if not self._hass_started.done(): + self._hass_started.set_result(SHUTDOWN_TASK) + self.queue.put(StopTask()) + self._async_stop_listeners() + await self.hass.async_add_executor_job(self.join) + + self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_shutdown) + + if self.hass.state == CoreState.running: + self._hass_started.set_result(None) + return + + @callback + def _async_hass_started(event: Event) -> None: + """Notify that hass has started.""" + self._hass_started.set_result(None) + + self.hass.bus.async_listen_once( + EVENT_HOMEASSISTANT_STARTED, _async_hass_started + ) + + @callback + def async_connection_failed(self) -> None: + """Connect failed tasks.""" + self.async_db_ready.set_result(False) + persistent_notification.async_create( + self.hass, + "The recorder could not start, check [the logs](/config/logs)", + "Recorder", + ) + self._async_stop_listeners() + + @callback + def async_connection_success(self) -> None: + """Connect success tasks.""" + self.async_db_ready.set_result(True) + self.async_start_executor() + + @callback + def _async_recorder_ready(self) -> None: + """Finish start and mark recorder ready.""" + self._async_setup_periodic_tasks() + self.async_recorder_ready.set() + + @callback + def async_nightly_tasks(self, now: datetime) -> None: + """Trigger the purge.""" + if self.auto_purge: + # Purge will schedule the periodic cleanups + # after it completes to ensure it does not happen + # until after the database is vacuumed + repack = self.auto_repack and is_second_sunday(now) + purge_before = dt_util.utcnow() - timedelta(days=self.keep_days) + self.queue.put(PurgeTask(purge_before, repack=repack, apply_filter=False)) + else: + self.queue.put(PerodicCleanupTask()) + + @callback + def async_periodic_statistics(self, now: datetime) -> None: + """Trigger the statistics run. + + Short term statistics run every 5 minutes + """ + start = statistics.get_start_time() + self.queue.put(StatisticsTask(start)) + + @callback + def async_adjust_statistics( + self, statistic_id: str, start_time: datetime, sum_adjustment: float + ) -> None: + """Adjust statistics.""" + self.queue.put(AdjustStatisticsTask(statistic_id, start_time, sum_adjustment)) + + @callback + def async_clear_statistics(self, statistic_ids: list[str]) -> None: + """Clear statistics for a list of statistic_ids.""" + self.queue.put(ClearStatisticsTask(statistic_ids)) + + @callback + def async_update_statistics_metadata( + self, statistic_id: str, unit_of_measurement: str | None + ) -> None: + """Update statistics metadata for a statistic_id.""" + self.queue.put(UpdateStatisticsMetadataTask(statistic_id, unit_of_measurement)) + + @callback + def async_external_statistics( + self, metadata: StatisticMetaData, stats: Iterable[StatisticData] + ) -> None: + """Schedule external statistics.""" + self.queue.put(ExternalStatisticsTask(metadata, stats)) + + @callback + def using_sqlite(self) -> bool: + """Return if recorder uses sqlite as the engine.""" + return bool(self.engine and self.engine.dialect.name == "sqlite") + + @callback + def _async_setup_periodic_tasks(self) -> None: + """Prepare periodic tasks.""" + if self.hass.is_stopping or not self.get_session: + # Home Assistant is shutting down + return + + # If the db is using a socket connection, we need to keep alive + # to prevent errors from unexpected disconnects + if not self.using_sqlite(): + self._keep_alive_listener = async_track_time_interval( + self.hass, self._async_keep_alive, timedelta(seconds=KEEPALIVE_TIME) + ) + + # If the commit interval is not 0, we need to commit periodically + if self.commit_interval: + self._commit_listener = async_track_time_interval( + self.hass, self._async_commit, timedelta(seconds=self.commit_interval) + ) + + # Run nightly tasks at 4:12am + self._nightly_listener = async_track_time_change( + self.hass, self.async_nightly_tasks, hour=4, minute=12, second=0 + ) + + # Compile short term statistics every 5 minutes + self._periodic_listener = async_track_utc_time_change( + self.hass, self.async_periodic_statistics, minute=range(0, 60, 5), second=10 + ) + + async def _async_wait_for_started(self) -> object | None: + """Wait for the hass started future.""" + return await self._hass_started + + def _wait_startup_or_shutdown(self) -> object | None: + """Wait for startup or shutdown before starting.""" + return asyncio.run_coroutine_threadsafe( + self._async_wait_for_started(), self.hass.loop + ).result() + + def run(self) -> None: + """Start processing events to save.""" + current_version = self._setup_recorder() + + if current_version is None: + self.hass.add_job(self.async_connection_failed) + return + + schema_is_current = migration.schema_is_current(current_version) + if schema_is_current: + self._setup_run() + else: + self.migration_in_progress = True + + self.hass.add_job(self.async_connection_success) + + # If shutdown happened before Home Assistant finished starting + if self._wait_startup_or_shutdown() is SHUTDOWN_TASK: + self.migration_in_progress = False + # Make sure we cleanly close the run if + # we restart before startup finishes + self._shutdown() + return + + # We wait to start the migration until startup has finished + # since it can be cpu intensive and we do not want it to compete + # with startup which is also cpu intensive + if not schema_is_current: + if self._migrate_schema_and_setup_run(current_version): + if not self._event_listener: + # If the schema migration takes so long that the end + # queue watcher safety kicks in because MAX_QUEUE_BACKLOG + # is reached, we need to reinitialize the listener. + self.hass.add_job(self.async_initialize) + else: + persistent_notification.create( + self.hass, + "The database migration failed, check [the logs](/config/logs)." + "Database Migration Failed", + "recorder_database_migration", + ) + self._shutdown() + return + + _LOGGER.debug("Recorder processing the queue") + self.hass.add_job(self._async_recorder_ready) + self._run_event_loop() + + def _run_event_loop(self) -> None: + """Run the event loop for the recorder.""" + # Use a session for the event read loop + # with a commit every time the event time + # has changed. This reduces the disk io. + self.stop_requested = False + while not self.stop_requested: + task = self.queue.get() + _LOGGER.debug("Processing task: %s", task) + try: + self._process_one_task_or_recover(task) + except Exception as err: # pylint: disable=broad-except + _LOGGER.exception("Error while processing event %s: %s", task, err) + + self._shutdown() + + def _process_one_task_or_recover(self, task: RecorderTask) -> None: + """Process an event, reconnect, or recover a malformed database.""" + try: + # If its not an event, commit everything + # that is pending before running the task + if task.commit_before: + self._commit_event_session_or_retry() + return task.run(self) + except exc.DatabaseError as err: + if self._handle_database_error(err): + return + _LOGGER.exception( + "Unhandled database error while processing task %s: %s", task, err + ) + except SQLAlchemyError as err: + _LOGGER.exception("SQLAlchemyError error processing task %s: %s", task, err) + + # Reset the session if an SQLAlchemyError (including DatabaseError) + # happens to rollback and recover + self._reopen_event_session() + + def _setup_recorder(self) -> None | int: + """Create connect to the database and get the schema version.""" + tries = 1 + + while tries <= self.db_max_retries: + try: + self._setup_connection() + return migration.get_schema_version(self) + except Exception as err: # pylint: disable=broad-except + _LOGGER.exception( + "Error during connection setup: %s (retrying in %s seconds)", + err, + self.db_retry_wait, + ) + tries += 1 + time.sleep(self.db_retry_wait) + + return None + + @callback + def _async_migration_started(self) -> None: + """Set the migration started event.""" + self.async_migration_event.set() + + def _migrate_schema_and_setup_run(self, current_version: int) -> bool: + """Migrate schema to the latest version.""" + persistent_notification.create( + self.hass, + "System performance will temporarily degrade during the database upgrade. Do not power down or restart the system until the upgrade completes. Integrations that read the database, such as logbook and history, may return inconsistent results until the upgrade completes.", + "Database upgrade in progress", + "recorder_database_migration", + ) + self.hass.add_job(self._async_migration_started) + + try: + migration.migrate_schema(self, current_version) + except exc.DatabaseError as err: + if self._handle_database_error(err): + return True + _LOGGER.exception("Database error during schema migration") + return False + except Exception: # pylint: disable=broad-except + _LOGGER.exception("Error during schema migration") + return False + else: + self._setup_run() + return True + finally: + self.migration_in_progress = False + persistent_notification.dismiss(self.hass, "recorder_database_migration") + + def _lock_database(self, task: DatabaseLockTask) -> None: + @callback + def _async_set_database_locked(task: DatabaseLockTask) -> None: + task.database_locked.set() + + with write_lock_db_sqlite(self): + # Notify that lock is being held, wait until database can be used again. + self.hass.add_job(_async_set_database_locked, task) + while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT): + if self.queue.qsize() > MAX_QUEUE_BACKLOG * 0.9: + _LOGGER.warning( + "Database queue backlog reached more than 90% of maximum queue " + "length while waiting for backup to finish; recorder will now " + "resume writing to database. The backup can not be trusted and " + "must be restarted" + ) + task.queue_overflow = True + break + _LOGGER.info( + "Database queue backlog reached %d entries during backup", + self.queue.qsize(), + ) + + def _process_one_event(self, event: Event) -> None: + if not self.enabled: + return + if event.event_type == EVENT_STATE_CHANGED: + self._process_state_changed_event_into_session(event) + else: + self._process_non_state_changed_event_into_session(event) + # Commit if the commit interval is zero + if not self.commit_interval: + self._commit_event_session_or_retry() + + def _find_shared_attr_in_db(self, attr_hash: int, shared_attrs: str) -> int | None: + """Find shared attributes in the db from the hash and shared_attrs.""" + # + # Avoid the event session being flushed since it will + # commit all the pending events and states to the database. + # + # The lookup has already have checked to see if the data is cached + # or going to be written in the next commit so there is no + # need to flush before checking the database. + # + assert self.event_session is not None + with self.event_session.no_autoflush: + if attributes_id := self.event_session.execute( + find_shared_attributes_id(attr_hash, shared_attrs) + ).first(): + return cast(int, attributes_id[0]) + return None + + def _find_shared_data_in_db(self, data_hash: int, shared_data: str) -> int | None: + """Find shared event data in the db from the hash and shared_attrs.""" + # + # Avoid the event session being flushed since it will + # commit all the pending events and states to the database. + # + # The lookup has already have checked to see if the data is cached + # or going to be written in the next commit so there is no + # need to flush before checking the database. + # + assert self.event_session is not None + with self.event_session.no_autoflush: + if data_id := self.event_session.execute( + find_shared_data_id(data_hash, shared_data) + ).first(): + return cast(int, data_id[0]) + return None + + def _process_non_state_changed_event_into_session(self, event: Event) -> None: + """Process any event into the session except state changed.""" + assert self.event_session is not None + dbevent = Events.from_event(event) + if not event.data: + self.event_session.add(dbevent) + return + + try: + shared_data = EventData.shared_data_from_event(event) + except (TypeError, ValueError) as ex: + _LOGGER.warning("Event is not JSON serializable: %s: %s", event, ex) + return + + # Matching attributes found in the pending commit + if pending_event_data := self._pending_event_data.get(shared_data): + dbevent.event_data_rel = pending_event_data + # Matching attributes id found in the cache + elif data_id := self._event_data_ids.get(shared_data): + dbevent.data_id = data_id + else: + data_hash = EventData.hash_shared_data(shared_data) + # Matching attributes found in the database + if data_id := self._find_shared_data_in_db(data_hash, shared_data): + self._event_data_ids[shared_data] = dbevent.data_id = data_id + # No matching attributes found, save them in the DB + else: + dbevent_data = EventData(shared_data=shared_data, hash=data_hash) + dbevent.event_data_rel = self._pending_event_data[ + shared_data + ] = dbevent_data + self.event_session.add(dbevent_data) + + self.event_session.add(dbevent) + + def _process_state_changed_event_into_session(self, event: Event) -> None: + """Process a state_changed event into the session.""" + assert self.event_session is not None + try: + dbstate = States.from_event(event) + shared_attrs = StateAttributes.shared_attrs_from_event( + event, self._exclude_attributes_by_domain + ) + except (TypeError, ValueError) as ex: + _LOGGER.warning( + "State is not JSON serializable: %s: %s", + event.data.get("new_state"), + ex, + ) + return + + dbstate.attributes = None + # Matching attributes found in the pending commit + if pending_attributes := self._pending_state_attributes.get(shared_attrs): + dbstate.state_attributes = pending_attributes + # Matching attributes id found in the cache + elif attributes_id := self._state_attributes_ids.get(shared_attrs): + dbstate.attributes_id = attributes_id + else: + attr_hash = StateAttributes.hash_shared_attrs(shared_attrs) + # Matching attributes found in the database + if attributes_id := self._find_shared_attr_in_db(attr_hash, shared_attrs): + dbstate.attributes_id = attributes_id + self._state_attributes_ids[shared_attrs] = attributes_id + # No matching attributes found, save them in the DB + else: + dbstate_attributes = StateAttributes( + shared_attrs=shared_attrs, hash=attr_hash + ) + dbstate.state_attributes = dbstate_attributes + self._pending_state_attributes[shared_attrs] = dbstate_attributes + self.event_session.add(dbstate_attributes) + + if old_state := self._old_states.pop(dbstate.entity_id, None): + if old_state.state_id: + dbstate.old_state_id = old_state.state_id + else: + dbstate.old_state = old_state + if event.data.get("new_state"): + self._old_states[dbstate.entity_id] = dbstate + self._pending_expunge.append(dbstate) + else: + dbstate.state = None + self.event_session.add(dbstate) + + def _handle_database_error(self, err: Exception) -> bool: + """Handle a database error that may result in moving away the corrupt db.""" + if isinstance(err.__cause__, sqlite3.DatabaseError): + _LOGGER.exception( + "Unrecoverable sqlite3 database corruption detected: %s", err + ) + self._handle_sqlite_corruption() + return True + return False + + def _event_session_has_pending_writes(self) -> bool: + return bool( + self.event_session and (self.event_session.new or self.event_session.dirty) + ) + + def _commit_event_session_or_retry(self) -> None: + """Commit the event session if there is work to do.""" + if not self._event_session_has_pending_writes(): + return + tries = 1 + while tries <= self.db_max_retries: + try: + self._commit_event_session() + return + except (exc.InternalError, exc.OperationalError) as err: + _LOGGER.error( + "%s: Error executing query: %s. (retrying in %s seconds)", + INVALIDATED_ERR if err.connection_invalidated else CONNECTIVITY_ERR, + err, + self.db_retry_wait, + ) + if tries == self.db_max_retries: + raise + + tries += 1 + time.sleep(self.db_retry_wait) + + def _commit_event_session(self) -> None: + assert self.event_session is not None + self._commits_without_expire += 1 + + if self._pending_expunge: + self.event_session.flush() + for dbstate in self._pending_expunge: + # Expunge the state so its not expired + # until we use it later for dbstate.old_state + if dbstate in self.event_session: + self.event_session.expunge(dbstate) + self._pending_expunge = [] + self.event_session.commit() + + # We just committed the state attributes to the database + # and we now know the attributes_ids. We can save + # many selects for matching attributes by loading them + # into the LRU cache now. + for state_attr in self._pending_state_attributes.values(): + self._state_attributes_ids[ + state_attr.shared_attrs + ] = state_attr.attributes_id + self._pending_state_attributes = {} + for event_data in self._pending_event_data.values(): + self._event_data_ids[event_data.shared_data] = event_data.data_id + self._pending_event_data = {} + + # Expire is an expensive operation (frequently more expensive + # than the flush and commit itself) so we only + # do it after EXPIRE_AFTER_COMMITS commits + if self._commits_without_expire >= EXPIRE_AFTER_COMMITS: + self._commits_without_expire = 0 + self.event_session.expire_all() + + def _handle_sqlite_corruption(self) -> None: + """Handle the sqlite3 database being corrupt.""" + self._close_event_session() + self._close_connection() + move_away_broken_database(dburl_to_path(self.db_url)) + self.run_history.reset() + self._setup_recorder() + self._setup_run() + + def _close_event_session(self) -> None: + """Close the event session.""" + self._old_states = {} + self._state_attributes_ids = {} + self._event_data_ids = {} + self._pending_state_attributes = {} + self._pending_event_data = {} + + if not self.event_session: + return + + try: + self.event_session.rollback() + self.event_session.close() + except SQLAlchemyError as err: + _LOGGER.exception( + "Error while rolling back and closing the event session: %s", err + ) + + def _reopen_event_session(self) -> None: + """Rollback the event session and reopen it after a failure.""" + self._close_event_session() + self._open_event_session() + + def _open_event_session(self) -> None: + """Open the event session.""" + assert self.get_session is not None + self.event_session = self.get_session() + self.event_session.expire_on_commit = False + + def _send_keep_alive(self) -> None: + """Send a keep alive to keep the db connection open.""" + assert self.event_session is not None + _LOGGER.debug("Sending keepalive") + self.event_session.connection().scalar(select([1])) + + @callback + def event_listener(self, event: Event) -> None: + """Listen for new events and put them in the process queue.""" + self.queue.put(EventTask(event)) + + def block_till_done(self) -> None: + """Block till all events processed. + + This is only called in tests. + + This only blocks until the queue is empty + which does not mean the recorder is done. + + Call tests.common's wait_recording_done + after calling this to ensure the data + is in the database. + """ + self._queue_watch.clear() + self.queue.put(WaitTask()) + self._queue_watch.wait() + + async def lock_database(self) -> bool: + """Lock database so it can be backed up safely.""" + if not self.using_sqlite(): + _LOGGER.debug( + "Not a SQLite database or not connected, locking not necessary" + ) + return True + + if self._database_lock_task: + _LOGGER.warning("Database already locked") + return False + + database_locked = asyncio.Event() + task = DatabaseLockTask(database_locked, threading.Event(), False) + self.queue.put(task) + try: + await asyncio.wait_for(database_locked.wait(), timeout=DB_LOCK_TIMEOUT) + except asyncio.TimeoutError as err: + task.database_unlock.set() + raise TimeoutError( + f"Could not lock database within {DB_LOCK_TIMEOUT} seconds." + ) from err + self._database_lock_task = task + return True + + @callback + def unlock_database(self) -> bool: + """Unlock database. + + Returns true if database lock has been held throughout the process. + """ + if not self.using_sqlite(): + _LOGGER.debug( + "Not a SQLite database or not connected, unlocking not necessary" + ) + return True + + if not self._database_lock_task: + _LOGGER.warning("Database currently not locked") + return False + + self._database_lock_task.database_unlock.set() + success = not self._database_lock_task.queue_overflow + + self._database_lock_task = None + + return success + + def _setup_connection(self) -> None: + """Ensure database is ready to fly.""" + kwargs: dict[str, Any] = {} + self._completed_first_database_setup = False + + def setup_recorder_connection( + dbapi_connection: Any, connection_record: Any + ) -> None: + """Dbapi specific connection settings.""" + assert self.engine is not None + setup_connection_for_dialect( + self, + self.engine.dialect.name, + dbapi_connection, + not self._completed_first_database_setup, + ) + self._completed_first_database_setup = True + + if self.db_url == SQLITE_URL_PREFIX or ":memory:" in self.db_url: + kwargs["connect_args"] = {"check_same_thread": False} + kwargs["poolclass"] = MutexPool + MutexPool.pool_lock = threading.RLock() + kwargs["pool_reset_on_return"] = None + elif self.db_url.startswith(SQLITE_URL_PREFIX): + kwargs["poolclass"] = RecorderPool + else: + kwargs["echo"] = False + + if self._using_file_sqlite: + validate_or_move_away_sqlite_database(self.db_url) + + self.engine = create_engine(self.db_url, **kwargs, future=True) + + sqlalchemy_event.listen(self.engine, "connect", setup_recorder_connection) + + Base.metadata.create_all(self.engine) + self.get_session = scoped_session(sessionmaker(bind=self.engine, future=True)) + _LOGGER.debug("Connected to recorder database") + + @property + def _using_file_sqlite(self) -> bool: + """Short version to check if we are using sqlite3 as a file.""" + return self.db_url != SQLITE_URL_PREFIX and self.db_url.startswith( + SQLITE_URL_PREFIX + ) + + def _close_connection(self) -> None: + """Close the connection.""" + assert self.engine is not None + self.engine.dispose() + self.engine = None + self.get_session = None + + def _setup_run(self) -> None: + """Log the start of the current run and schedule any needed jobs.""" + assert self.get_session is not None + with session_scope(session=self.get_session()) as session: + end_incomplete_runs(session, self.run_history.recording_start) + self.run_history.start(session) + self._schedule_compile_missing_statistics(session) + + self._open_event_session() + + def _schedule_compile_missing_statistics(self, session: Session) -> None: + """Add tasks for missing statistics runs.""" + now = dt_util.utcnow() + last_period_minutes = now.minute - now.minute % 5 + last_period = now.replace(minute=last_period_minutes, second=0, microsecond=0) + start = now - timedelta(days=self.keep_days) + start = start.replace(minute=0, second=0, microsecond=0) + + # Find the newest statistics run, if any + if last_run := session.query(func.max(StatisticsRuns.start)).scalar(): + start = max(start, process_timestamp(last_run) + timedelta(minutes=5)) + + # Add tasks + while start < last_period: + end = start + timedelta(minutes=5) + _LOGGER.debug("Compiling missing statistics for %s-%s", start, end) + self.queue.put(StatisticsTask(start)) + start = end + + def _end_session(self) -> None: + """End the recorder session.""" + if self.event_session is None: + return + try: + self.run_history.end(self.event_session) + self._commit_event_session_or_retry() + self.event_session.close() + except Exception as err: # pylint: disable=broad-except + _LOGGER.exception("Error saving the event session during shutdown: %s", err) + + self.run_history.clear() + + def _shutdown(self) -> None: + """Save end time for current run.""" + self.hass.add_job(self._async_stop_listeners) + self._stop_executor() + self._end_session() + self._close_connection() + + @property + def recording(self) -> bool: + """Return if the recorder is recording.""" + return self._event_listener is not None diff --git a/homeassistant/components/recorder/tasks.py b/homeassistant/components/recorder/tasks.py new file mode 100644 index 00000000000..fdffa63bcd3 --- /dev/null +++ b/homeassistant/components/recorder/tasks.py @@ -0,0 +1,250 @@ +"""Support for recording details.""" +from __future__ import annotations + +import abc +import asyncio +from collections.abc import Callable, Iterable +from dataclasses import dataclass +from datetime import datetime +import threading +from typing import TYPE_CHECKING, Any + +from homeassistant.core import Event + +from . import purge, statistics +from .const import DOMAIN, EXCLUDE_ATTRIBUTES +from .models import StatisticData, StatisticMetaData +from .util import periodic_db_cleanups + +if TYPE_CHECKING: + from .core import Recorder + + +class RecorderTask(abc.ABC): + """ABC for recorder tasks.""" + + commit_before = True + + @abc.abstractmethod + def run(self, instance: Recorder) -> None: + """Handle the task.""" + + +@dataclass +class ClearStatisticsTask(RecorderTask): + """Object to store statistics_ids which for which to remove statistics.""" + + statistic_ids: list[str] + + def run(self, instance: Recorder) -> None: + """Handle the task.""" + statistics.clear_statistics(instance, self.statistic_ids) + + +@dataclass +class UpdateStatisticsMetadataTask(RecorderTask): + """Object to store statistics_id and unit for update of statistics metadata.""" + + statistic_id: str + unit_of_measurement: str | None + + def run(self, instance: Recorder) -> None: + """Handle the task.""" + statistics.update_statistics_metadata( + instance, self.statistic_id, self.unit_of_measurement + ) + + +@dataclass +class PurgeTask(RecorderTask): + """Object to store information about purge task.""" + + purge_before: datetime + repack: bool + apply_filter: bool + + def run(self, instance: Recorder) -> None: + """Purge the database.""" + assert instance.get_session is not None + + if purge.purge_old_data( + instance, self.purge_before, self.repack, self.apply_filter + ): + with instance.get_session() as session: + instance.run_history.load_from_db(session) + # We always need to do the db cleanups after a purge + # is finished to ensure the WAL checkpoint and other + # tasks happen after a vacuum. + periodic_db_cleanups(instance) + return + # Schedule a new purge task if this one didn't finish + instance.queue.put(PurgeTask(self.purge_before, self.repack, self.apply_filter)) + + +@dataclass +class PurgeEntitiesTask(RecorderTask): + """Object to store entity information about purge task.""" + + entity_filter: Callable[[str], bool] + + def run(self, instance: Recorder) -> None: + """Purge entities from the database.""" + if purge.purge_entity_data(instance, self.entity_filter): + return + # Schedule a new purge task if this one didn't finish + instance.queue.put(PurgeEntitiesTask(self.entity_filter)) + + +@dataclass +class PerodicCleanupTask(RecorderTask): + """An object to insert into the recorder to trigger cleanup tasks when auto purge is disabled.""" + + def run(self, instance: Recorder) -> None: + """Handle the task.""" + periodic_db_cleanups(instance) + + +@dataclass +class StatisticsTask(RecorderTask): + """An object to insert into the recorder queue to run a statistics task.""" + + start: datetime + + def run(self, instance: Recorder) -> None: + """Run statistics task.""" + if statistics.compile_statistics(instance, self.start): + return + # Schedule a new statistics task if this one didn't finish + instance.queue.put(StatisticsTask(self.start)) + + +@dataclass +class ExternalStatisticsTask(RecorderTask): + """An object to insert into the recorder queue to run an external statistics task.""" + + metadata: StatisticMetaData + statistics: Iterable[StatisticData] + + def run(self, instance: Recorder) -> None: + """Run statistics task.""" + if statistics.add_external_statistics(instance, self.metadata, self.statistics): + return + # Schedule a new statistics task if this one didn't finish + instance.queue.put(ExternalStatisticsTask(self.metadata, self.statistics)) + + +@dataclass +class AdjustStatisticsTask(RecorderTask): + """An object to insert into the recorder queue to run an adjust statistics task.""" + + statistic_id: str + start_time: datetime + sum_adjustment: float + + def run(self, instance: Recorder) -> None: + """Run statistics task.""" + if statistics.adjust_statistics( + instance, + self.statistic_id, + self.start_time, + self.sum_adjustment, + ): + return + # Schedule a new adjust statistics task if this one didn't finish + instance.queue.put( + AdjustStatisticsTask( + self.statistic_id, self.start_time, self.sum_adjustment + ) + ) + + +@dataclass +class WaitTask(RecorderTask): + """An object to insert into the recorder queue to tell it set the _queue_watch event.""" + + commit_before = False + + def run(self, instance: Recorder) -> None: + """Handle the task.""" + instance._queue_watch.set() # pylint: disable=[protected-access] + + +@dataclass +class DatabaseLockTask(RecorderTask): + """An object to insert into the recorder queue to prevent writes to the database.""" + + database_locked: asyncio.Event + database_unlock: threading.Event + queue_overflow: bool + + def run(self, instance: Recorder) -> None: + """Handle the task.""" + instance._lock_database(self) # pylint: disable=[protected-access] + + +@dataclass +class StopTask(RecorderTask): + """An object to insert into the recorder queue to stop the event handler.""" + + commit_before = False + + def run(self, instance: Recorder) -> None: + """Handle the task.""" + instance.stop_requested = True + + +@dataclass +class EventTask(RecorderTask): + """An event to be processed.""" + + event: Event + commit_before = False + + def run(self, instance: Recorder) -> None: + """Handle the task.""" + # pylint: disable-next=[protected-access] + instance._process_one_event(self.event) + + +@dataclass +class KeepAliveTask(RecorderTask): + """A keep alive to be sent.""" + + commit_before = False + + def run(self, instance: Recorder) -> None: + """Handle the task.""" + # pylint: disable-next=[protected-access] + instance._send_keep_alive() + + +@dataclass +class CommitTask(RecorderTask): + """Commit the event session.""" + + commit_before = False + + def run(self, instance: Recorder) -> None: + """Handle the task.""" + # pylint: disable-next=[protected-access] + instance._commit_event_session_or_retry() + + +@dataclass +class AddRecorderPlatformTask(RecorderTask): + """Add a recorder platform.""" + + domain: str + platform: Any + commit_before = False + + def run(self, instance: Recorder) -> None: + """Handle the task.""" + hass = instance.hass + domain = self.domain + platform = self.platform + + platforms: dict[str, Any] = hass.data[DOMAIN] + platforms[domain] = platform + if hasattr(self.platform, "exclude_attributes"): + hass.data[EXCLUDE_ATTRIBUTES][domain] = platform.exclude_attributes(hass) diff --git a/mypy.ini b/mypy.ini index a4f7ef80d30..b55d5e74998 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1732,6 +1732,17 @@ no_implicit_optional = true warn_return_any = true warn_unreachable = true +[mypy-homeassistant.components.recorder.core] +check_untyped_defs = true +disallow_incomplete_defs = true +disallow_subclassing_any = true +disallow_untyped_calls = true +disallow_untyped_decorators = true +disallow_untyped_defs = true +no_implicit_optional = true +warn_return_any = true +warn_unreachable = true + [mypy-homeassistant.components.recorder.backup] check_untyped_defs = true disallow_incomplete_defs = true @@ -1842,6 +1853,17 @@ no_implicit_optional = true warn_return_any = true warn_unreachable = true +[mypy-homeassistant.components.recorder.tasks] +check_untyped_defs = true +disallow_incomplete_defs = true +disallow_subclassing_any = true +disallow_untyped_calls = true +disallow_untyped_decorators = true +disallow_untyped_defs = true +no_implicit_optional = true +warn_return_any = true +warn_unreachable = true + [mypy-homeassistant.components.recorder.util] check_untyped_defs = true disallow_incomplete_defs = true diff --git a/tests/components/history/test_init.py b/tests/components/history/test_init.py index 4ae1354464d..630d0b84889 100644 --- a/tests/components/history/test_init.py +++ b/tests/components/history/test_init.py @@ -459,23 +459,28 @@ def test_get_significant_states_only(hass_history): points.append(start + timedelta(minutes=i)) states = [] - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=start): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=start + ): set_state("123", attributes={"attribute": 10.64}) with patch( - "homeassistant.components.recorder.dt_util.utcnow", return_value=points[0] + "homeassistant.components.recorder.core.dt_util.utcnow", + return_value=points[0], ): # Attributes are different, state not states.append(set_state("123", attributes={"attribute": 21.42})) with patch( - "homeassistant.components.recorder.dt_util.utcnow", return_value=points[1] + "homeassistant.components.recorder.core.dt_util.utcnow", + return_value=points[1], ): # state is different, attributes not states.append(set_state("32", attributes={"attribute": 21.42})) with patch( - "homeassistant.components.recorder.dt_util.utcnow", return_value=points[2] + "homeassistant.components.recorder.core.dt_util.utcnow", + return_value=points[2], ): # everything is different states.append(set_state("412", attributes={"attribute": 54.23})) @@ -536,7 +541,9 @@ def record_states(hass): four = three + timedelta(seconds=1) states = {therm: [], therm2: [], mp: [], mp2: [], mp3: [], script_c: []} - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=one + ): states[mp].append( set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)}) ) @@ -553,7 +560,9 @@ def record_states(hass): set_state(therm, 20, attributes={"current_temperature": 19.5}) ) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=two + ): # This state will be skipped only different in time set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt3)}) # This state will be skipped because domain is excluded @@ -568,7 +577,9 @@ def record_states(hass): set_state(therm2, 20, attributes={"current_temperature": 19}) ) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=three + ): states[mp].append( set_state(mp, "Netflix", attributes={"media_title": str(sentinel.mt4)}) ) diff --git a/tests/components/recorder/test_history.py b/tests/components/recorder/test_history.py index 8f22447cd8e..5e29fb092b1 100644 --- a/tests/components/recorder/test_history.py +++ b/tests/components/recorder/test_history.py @@ -52,7 +52,7 @@ async def _async_get_states( def _add_db_entries( hass: ha.HomeAssistant, point: datetime, entity_ids: list[str] ) -> None: - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: for idx, entity_id in enumerate(entity_ids): session.add( Events( @@ -87,7 +87,9 @@ def _setup_get_states(hass): """Set up for testing get_states.""" states = [] now = dt_util.utcnow() - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=now): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=now + ): for i in range(5): state = ha.State( f"test.point_in_time_{i % 5}", @@ -102,7 +104,9 @@ def _setup_get_states(hass): wait_recording_done(hass) future = now + timedelta(seconds=1) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=future): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=future + ): for i in range(5): state = ha.State( f"test.point_in_time_{i % 5}", @@ -122,7 +126,7 @@ def test_get_full_significant_states_with_session_entity_no_matches(hass_recorde hass = hass_recorder() now = dt_util.utcnow() time_before_recorder_ran = now - timedelta(days=1000) - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: assert ( history.get_full_significant_states_with_session( hass, session, time_before_recorder_ran, now, entity_ids=["demo.id"] @@ -148,7 +152,7 @@ def test_significant_states_with_session_entity_minimal_response_no_matches( hass = hass_recorder() now = dt_util.utcnow() time_before_recorder_ran = now - timedelta(days=1000) - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: assert ( history.get_significant_states_with_session( hass, @@ -197,11 +201,15 @@ def test_state_changes_during_period(hass_recorder, attributes, no_attributes, l point = start + timedelta(seconds=1) end = point + timedelta(seconds=1) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=start): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=start + ): set_state("idle") set_state("YouTube") - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=point): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=point + ): states = [ set_state("idle"), set_state("Netflix"), @@ -209,7 +217,9 @@ def test_state_changes_during_period(hass_recorder, attributes, no_attributes, l set_state("YouTube"), ] - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=end): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=end + ): set_state("Netflix") set_state("Plex") @@ -235,11 +245,15 @@ def test_state_changes_during_period_descending(hass_recorder): point = start + timedelta(seconds=1) end = point + timedelta(seconds=1) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=start): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=start + ): set_state("idle") set_state("YouTube") - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=point): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=point + ): states = [ set_state("idle"), set_state("Netflix"), @@ -247,7 +261,9 @@ def test_state_changes_during_period_descending(hass_recorder): set_state("YouTube"), ] - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=end): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=end + ): set_state("Netflix") set_state("Plex") @@ -277,14 +293,20 @@ def test_get_last_state_changes(hass_recorder): point = start + timedelta(minutes=1) point2 = point + timedelta(minutes=1) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=start): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=start + ): set_state("1") states = [] - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=point): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=point + ): states.append(set_state("2")) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=point2): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=point2 + ): states.append(set_state("3")) hist = history.get_last_state_changes(hass, 2, entity_id) @@ -310,10 +332,14 @@ def test_ensure_state_can_be_copied(hass_recorder): start = dt_util.utcnow() - timedelta(minutes=2) point = start + timedelta(minutes=1) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=start): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=start + ): set_state("1") - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=point): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=point + ): set_state("2") hist = history.get_last_state_changes(hass, 2, entity_id) @@ -486,23 +512,28 @@ def test_get_significant_states_only(hass_recorder): points.append(start + timedelta(minutes=i)) states = [] - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=start): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=start + ): set_state("123", attributes={"attribute": 10.64}) with patch( - "homeassistant.components.recorder.dt_util.utcnow", return_value=points[0] + "homeassistant.components.recorder.core.dt_util.utcnow", + return_value=points[0], ): # Attributes are different, state not states.append(set_state("123", attributes={"attribute": 21.42})) with patch( - "homeassistant.components.recorder.dt_util.utcnow", return_value=points[1] + "homeassistant.components.recorder.core.dt_util.utcnow", + return_value=points[1], ): # state is different, attributes not states.append(set_state("32", attributes={"attribute": 21.42})) with patch( - "homeassistant.components.recorder.dt_util.utcnow", return_value=points[2] + "homeassistant.components.recorder.core.dt_util.utcnow", + return_value=points[2], ): # everything is different states.append(set_state("412", attributes={"attribute": 54.23})) @@ -547,7 +578,9 @@ def record_states(hass): four = three + timedelta(seconds=1) states = {therm: [], therm2: [], mp: [], mp2: [], mp3: [], script_c: []} - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=one + ): states[mp].append( set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)}) ) @@ -564,7 +597,9 @@ def record_states(hass): set_state(therm, 20, attributes={"current_temperature": 19.5}) ) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=two + ): # This state will be skipped only different in time set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt3)}) # This state will be skipped because domain is excluded @@ -579,7 +614,9 @@ def record_states(hass): set_state(therm2, 20, attributes={"current_temperature": 19}) ) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=three + ): states[mp].append( set_state(mp, "Netflix", attributes={"media_title": str(sentinel.mt4)}) ) diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 6d4f2e1106a..fc2c03e0039 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -20,7 +20,6 @@ from homeassistant.components.recorder import ( CONF_DB_URL, CONFIG_SCHEMA, DOMAIN, - KEEPALIVE_TIME, SERVICE_DISABLE, SERVICE_ENABLE, SERVICE_PURGE, @@ -29,7 +28,7 @@ from homeassistant.components.recorder import ( Recorder, get_instance, ) -from homeassistant.components.recorder.const import DATA_INSTANCE +from homeassistant.components.recorder.const import DATA_INSTANCE, KEEPALIVE_TIME from homeassistant.components.recorder.models import ( EventData, Events, @@ -196,7 +195,7 @@ async def test_saving_many_states( with patch.object( hass.data[DATA_INSTANCE].event_session, "expire_all" - ) as expire_all, patch.object(recorder, "EXPIRE_AFTER_COMMITS", 2): + ) as expire_all, patch.object(recorder.core, "EXPIRE_AFTER_COMMITS", 2): for _ in range(3): hass.states.async_set(entity_id, "on", attributes) await async_wait_recording_done(hass) @@ -611,7 +610,7 @@ def test_saving_state_and_removing_entity(hass, hass_recorder): def test_recorder_setup_failure(hass): """Test some exceptions.""" with patch.object(Recorder, "_setup_connection") as setup, patch( - "homeassistant.components.recorder.time.sleep" + "homeassistant.components.recorder.core.time.sleep" ): setup.side_effect = ImportError("driver not found") rec = _default_recorder(hass) @@ -625,7 +624,7 @@ def test_recorder_setup_failure(hass): def test_recorder_setup_failure_without_event_listener(hass): """Test recorder setup failure when the event listener is not setup.""" with patch.object(Recorder, "_setup_connection") as setup, patch( - "homeassistant.components.recorder.time.sleep" + "homeassistant.components.recorder.core.time.sleep" ): setup.side_effect = ImportError("driver not found") rec = _default_recorder(hass) @@ -685,7 +684,7 @@ def test_auto_purge(hass_recorder): with patch( "homeassistant.components.recorder.purge.purge_old_data", return_value=True ) as purge_old_data, patch( - "homeassistant.components.recorder.periodic_db_cleanups" + "homeassistant.components.recorder.tasks.periodic_db_cleanups" ) as periodic_db_cleanups: # Advance one day, and the purge task should run test_time = test_time + timedelta(days=1) @@ -741,11 +740,11 @@ def test_auto_purge_auto_repack_on_second_sunday(hass_recorder): run_tasks_at_time(hass, test_time) with patch( - "homeassistant.components.recorder.is_second_sunday", return_value=True + "homeassistant.components.recorder.core.is_second_sunday", return_value=True ), patch( "homeassistant.components.recorder.purge.purge_old_data", return_value=True ) as purge_old_data, patch( - "homeassistant.components.recorder.periodic_db_cleanups" + "homeassistant.components.recorder.tasks.periodic_db_cleanups" ) as periodic_db_cleanups: # Advance one day, and the purge task should run test_time = test_time + timedelta(days=1) @@ -779,11 +778,11 @@ def test_auto_purge_auto_repack_disabled_on_second_sunday(hass_recorder): run_tasks_at_time(hass, test_time) with patch( - "homeassistant.components.recorder.is_second_sunday", return_value=True + "homeassistant.components.recorder.core.is_second_sunday", return_value=True ), patch( "homeassistant.components.recorder.purge.purge_old_data", return_value=True ) as purge_old_data, patch( - "homeassistant.components.recorder.periodic_db_cleanups" + "homeassistant.components.recorder.tasks.periodic_db_cleanups" ) as periodic_db_cleanups: # Advance one day, and the purge task should run test_time = test_time + timedelta(days=1) @@ -817,11 +816,12 @@ def test_auto_purge_no_auto_repack_on_not_second_sunday(hass_recorder): run_tasks_at_time(hass, test_time) with patch( - "homeassistant.components.recorder.is_second_sunday", return_value=False + "homeassistant.components.recorder.core.is_second_sunday", + return_value=False, ), patch( "homeassistant.components.recorder.purge.purge_old_data", return_value=True ) as purge_old_data, patch( - "homeassistant.components.recorder.periodic_db_cleanups" + "homeassistant.components.recorder.tasks.periodic_db_cleanups" ) as periodic_db_cleanups: # Advance one day, and the purge task should run test_time = test_time + timedelta(days=1) @@ -856,7 +856,7 @@ def test_auto_purge_disabled(hass_recorder): with patch( "homeassistant.components.recorder.purge.purge_old_data", return_value=True ) as purge_old_data, patch( - "homeassistant.components.recorder.periodic_db_cleanups" + "homeassistant.components.recorder.tasks.periodic_db_cleanups" ) as periodic_db_cleanups: # Advance one day, and the purge task should run test_time = test_time + timedelta(days=1) @@ -924,7 +924,9 @@ def test_auto_statistics(hass_recorder): def test_statistics_runs_initiated(hass_recorder): """Test statistics_runs is initiated when DB is created.""" now = dt_util.utcnow() - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=now): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=now + ): hass = hass_recorder() wait_recording_done(hass) @@ -944,7 +946,9 @@ def test_compile_missing_statistics(tmpdir): test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db") dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}" - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=now): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=now + ): hass = get_test_home_assistant() setup_component(hass, DOMAIN, {DOMAIN: {CONF_DB_URL: dburl}}) @@ -963,7 +967,7 @@ def test_compile_missing_statistics(tmpdir): hass.stop() with patch( - "homeassistant.components.recorder.dt_util.utcnow", + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=now + timedelta(hours=1), ): @@ -1356,8 +1360,8 @@ async def test_database_lock_and_overflow( instance: Recorder = hass.data[DATA_INSTANCE] - with patch.object(recorder, "MAX_QUEUE_BACKLOG", 1), patch.object( - recorder, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.1 + with patch.object(recorder.core, "MAX_QUEUE_BACKLOG", 1), patch.object( + recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.1 ): await instance.lock_database() @@ -1382,7 +1386,7 @@ async def test_database_lock_timeout(hass, recorder_mock): instance: Recorder = hass.data[DATA_INSTANCE] - class BlockQueue(recorder.RecorderTask): + class BlockQueue(recorder.tasks.RecorderTask): event: threading.Event = threading.Event() def run(self, instance: Recorder) -> None: @@ -1390,7 +1394,7 @@ async def test_database_lock_timeout(hass, recorder_mock): block_task = BlockQueue() instance.queue.put(block_task) - with patch.object(recorder, "DB_LOCK_TIMEOUT", 0.1): + with patch.object(recorder.core, "DB_LOCK_TIMEOUT", 0.1): try: with pytest.raises(TimeoutError): await instance.lock_database() @@ -1435,7 +1439,7 @@ async def test_database_connection_keep_alive( await instance.async_recorder_ready.wait() async_fire_time_changed( - hass, dt_util.utcnow() + timedelta(seconds=recorder.KEEPALIVE_TIME) + hass, dt_util.utcnow() + timedelta(seconds=recorder.core.KEEPALIVE_TIME) ) await async_wait_recording_done(hass) assert "Sending keepalive" in caplog.text @@ -1452,7 +1456,7 @@ async def test_database_connection_keep_alive_disabled_on_sqlite( await instance.async_recorder_ready.wait() async_fire_time_changed( - hass, dt_util.utcnow() + timedelta(seconds=recorder.KEEPALIVE_TIME) + hass, dt_util.utcnow() + timedelta(seconds=recorder.core.KEEPALIVE_TIME) ) await async_wait_recording_done(hass) assert "Sending keepalive" not in caplog.text @@ -1482,7 +1486,7 @@ def test_deduplication_event_data_inside_commit_interval(hass_recorder, caplog): # Patch STATE_ATTRIBUTES_ID_CACHE_SIZE since otherwise # the CI can fail because the test takes too long to run -@patch("homeassistant.components.recorder.STATE_ATTRIBUTES_ID_CACHE_SIZE", 5) +@patch("homeassistant.components.recorder.core.STATE_ATTRIBUTES_ID_CACHE_SIZE", 5) def test_deduplication_state_attributes_inside_commit_interval(hass_recorder, caplog): """Test deduplication of state attributes inside the commit interval.""" hass = hass_recorder() diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index 6b963941263..1b84eb5d171 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -44,7 +44,8 @@ async def test_schema_update_calls(hass): assert recorder.util.async_migration_in_progress(hass) is False with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch( - "homeassistant.components.recorder.create_engine", new=create_engine_test + "homeassistant.components.recorder.core.create_engine", + new=create_engine_test, ), patch( "homeassistant.components.recorder.migration._apply_update", wraps=migration._apply_update, @@ -67,10 +68,10 @@ async def test_migration_in_progress(hass): """Test that we can check for migration in progress.""" assert recorder.util.async_migration_in_progress(hass) is False - with patch( - "homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", - True, - ), patch("homeassistant.components.recorder.create_engine", new=create_engine_test): + with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True,), patch( + "homeassistant.components.recorder.core.create_engine", + new=create_engine_test, + ): await async_setup_component( hass, "recorder", {"recorder": {"db_url": "sqlite://"}} ) @@ -86,7 +87,8 @@ async def test_database_migration_failed(hass): assert recorder.util.async_migration_in_progress(hass) is False with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch( - "homeassistant.components.recorder.create_engine", new=create_engine_test + "homeassistant.components.recorder.core.create_engine", + new=create_engine_test, ), patch( "homeassistant.components.recorder.migration._apply_update", side_effect=ValueError, @@ -125,7 +127,7 @@ async def test_database_migration_encounters_corruption(hass): "homeassistant.components.recorder.migration.migrate_schema", side_effect=sqlite3_exception, ), patch( - "homeassistant.components.recorder.move_away_broken_database" + "homeassistant.components.recorder.core.move_away_broken_database" ) as move_away: await async_setup_component( hass, "recorder", {"recorder": {"db_url": "sqlite://"}} @@ -149,7 +151,7 @@ async def test_database_migration_encounters_corruption_not_sqlite(hass): "homeassistant.components.recorder.migration.migrate_schema", side_effect=DatabaseError("statement", {}, []), ), patch( - "homeassistant.components.recorder.move_away_broken_database" + "homeassistant.components.recorder.core.move_away_broken_database" ) as move_away, patch( "homeassistant.components.persistent_notification.create", side_effect=pn.create ) as mock_create, patch( @@ -176,10 +178,10 @@ async def test_events_during_migration_are_queued(hass): assert recorder.util.async_migration_in_progress(hass) is False - with patch( - "homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", - True, - ), patch("homeassistant.components.recorder.create_engine", new=create_engine_test): + with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True,), patch( + "homeassistant.components.recorder.core.create_engine", + new=create_engine_test, + ): await async_setup_component( hass, "recorder", @@ -207,8 +209,9 @@ async def test_events_during_migration_queue_exhausted(hass): assert recorder.util.async_migration_in_progress(hass) is False with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch( - "homeassistant.components.recorder.create_engine", new=create_engine_test - ), patch.object(recorder, "MAX_QUEUE_BACKLOG", 1): + "homeassistant.components.recorder.core.create_engine", + new=create_engine_test, + ), patch.object(recorder.core, "MAX_QUEUE_BACKLOG", 1): await async_setup_component( hass, "recorder", @@ -296,7 +299,8 @@ async def test_schema_migrate(hass, start_version): migration_done.set() with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch( - "homeassistant.components.recorder.create_engine", new=_create_engine_test + "homeassistant.components.recorder.core.create_engine", + new=_create_engine_test, ), patch( "homeassistant.components.recorder.Recorder._setup_run", side_effect=_mock_setup_run, diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index d946d1e2a14..8ac2f3a783c 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -9,7 +9,6 @@ from sqlalchemy.exc import DatabaseError, OperationalError from sqlalchemy.orm.session import Session from homeassistant.components import recorder -from homeassistant.components.recorder import PurgeTask from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE from homeassistant.components.recorder.models import ( Events, @@ -20,6 +19,7 @@ from homeassistant.components.recorder.models import ( StatisticsShortTerm, ) from homeassistant.components.recorder.purge import purge_old_data +from homeassistant.components.recorder.tasks import PurgeTask from homeassistant.components.recorder.util import session_scope from homeassistant.const import EVENT_STATE_CHANGED, STATE_ON from homeassistant.core import HomeAssistant @@ -128,7 +128,7 @@ async def test_purge_old_states_encouters_database_corruption( sqlite3_exception.__cause__ = sqlite3.DatabaseError() with patch( - "homeassistant.components.recorder.move_away_broken_database" + "homeassistant.components.recorder.core.move_away_broken_database" ) as move_away, patch( "homeassistant.components.recorder.purge.purge_old_data", side_effect=sqlite3_exception, @@ -406,7 +406,7 @@ async def test_purge_edge_case( """Test states and events are purged even if they occurred shortly before purge_before.""" async def _add_db_entries(hass: HomeAssistant, timestamp: datetime) -> None: - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: session.add( Events( event_id=1001, @@ -477,7 +477,7 @@ async def test_purge_cutoff_date( timestamp_keep = cutoff timestamp_purge = cutoff - timedelta(microseconds=1) - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: session.add( Events( event_id=1000, @@ -626,7 +626,7 @@ async def test_purge_filtered_states( assert instance.entity_filter("sensor.excluded") is False def _add_db_entries(hass: HomeAssistant) -> None: - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: # Add states and state_changed events that should be purged for days in range(1, 4): timestamp = dt_util.utcnow() - timedelta(days=days) @@ -820,7 +820,7 @@ async def test_purge_filtered_states_to_empty( assert instance.entity_filter("sensor.excluded") is False def _add_db_entries(hass: HomeAssistant) -> None: - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: # Add states and state_changed events that should be purged for days in range(1, 4): timestamp = dt_util.utcnow() - timedelta(days=days) @@ -877,7 +877,7 @@ async def test_purge_without_state_attributes_filtered_states_to_empty( assert instance.entity_filter("sensor.old_format") is False def _add_db_entries(hass: HomeAssistant) -> None: - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: # Add states and state_changed events that should be purged # in the legacy format timestamp = dt_util.utcnow() - timedelta(days=5) @@ -944,7 +944,7 @@ async def test_purge_filtered_events( await async_setup_recorder_instance(hass, config) def _add_db_entries(hass: HomeAssistant) -> None: - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: # Add events that should be purged for days in range(1, 4): timestamp = dt_util.utcnow() - timedelta(days=days) @@ -1038,7 +1038,7 @@ async def test_purge_filtered_events_state_changed( assert instance.entity_filter("sensor.excluded") is True def _add_db_entries(hass: HomeAssistant) -> None: - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: # Add states and state_changed events that should be purged for days in range(1, 4): timestamp = dt_util.utcnow() - timedelta(days=days) @@ -1154,7 +1154,7 @@ async def test_purge_entities( await async_wait_purge_done(hass) def _add_purge_records(hass: HomeAssistant) -> None: - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: # Add states and state_changed events that should be purged for days in range(1, 4): timestamp = dt_util.utcnow() - timedelta(days=days) @@ -1186,7 +1186,7 @@ async def test_purge_entities( ) def _add_keep_records(hass: HomeAssistant) -> None: - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: # Add states and state_changed events that should be kept timestamp = dt_util.utcnow() - timedelta(days=2) for event_id in range(200, 210): @@ -1289,7 +1289,8 @@ async def _add_test_states(hass: HomeAssistant): attributes = {"dontpurgeme": True, **base_attributes} with patch( - "homeassistant.components.recorder.dt_util.utcnow", return_value=timestamp + "homeassistant.components.recorder.core.dt_util.utcnow", + return_value=timestamp, ): await set_state("test.recorder2", state, attributes=attributes) @@ -1304,7 +1305,7 @@ async def _add_test_events(hass: HomeAssistant): await hass.async_block_till_done() await async_wait_recording_done(hass) - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: for event_id in range(6): if event_id < 2: timestamp = eleven_days_ago @@ -1335,7 +1336,7 @@ async def _add_test_statistics(hass: HomeAssistant): await hass.async_block_till_done() await async_wait_recording_done(hass) - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: for event_id in range(6): if event_id < 2: timestamp = eleven_days_ago @@ -1364,7 +1365,7 @@ async def _add_test_recorder_runs(hass: HomeAssistant): await hass.async_block_till_done() await async_wait_recording_done(hass) - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: for rec_id in range(6): if rec_id < 2: timestamp = eleven_days_ago @@ -1391,7 +1392,7 @@ async def _add_test_statistics_runs(hass: HomeAssistant): await hass.async_block_till_done() await async_wait_recording_done(hass) - with recorder.session_scope(hass=hass) as session: + with session_scope(hass=hass) as session: for rec_id in range(6): if rec_id < 2: timestamp = eleven_days_ago diff --git a/tests/components/recorder/test_statistics.py b/tests/components/recorder/test_statistics.py index 58f848edc5a..dc13f2abb6a 100644 --- a/tests/components/recorder/test_statistics.py +++ b/tests/components/recorder/test_statistics.py @@ -839,7 +839,9 @@ def record_states(hass): four = three + timedelta(seconds=15 * 5) states = {mp: [], sns1: [], sns2: [], sns3: [], sns4: []} - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=one + ): states[mp].append( set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)}) ) @@ -851,13 +853,17 @@ def record_states(hass): states[sns3].append(set_state(sns3, "10", attributes=sns3_attr)) states[sns4].append(set_state(sns4, "10", attributes=sns4_attr)) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=two + ): states[sns1].append(set_state(sns1, "15", attributes=sns1_attr)) states[sns2].append(set_state(sns2, "15", attributes=sns2_attr)) states[sns3].append(set_state(sns3, "15", attributes=sns3_attr)) states[sns4].append(set_state(sns4, "15", attributes=sns4_attr)) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=three + ): states[sns1].append(set_state(sns1, "20", attributes=sns1_attr)) states[sns2].append(set_state(sns2, "20", attributes=sns2_attr)) states[sns3].append(set_state(sns3, "20", attributes=sns3_attr)) diff --git a/tests/components/recorder/test_util.py b/tests/components/recorder/test_util.py index f6d2fa0a99d..07334fef1c3 100644 --- a/tests/components/recorder/test_util.py +++ b/tests/components/recorder/test_util.py @@ -45,7 +45,7 @@ def test_recorder_bad_commit(hass_recorder): session.execute(text("select * from notthere")) with patch( - "homeassistant.components.recorder.time.sleep" + "homeassistant.components.recorder.core.time.sleep" ) as e_mock, util.session_scope(hass=hass) as session: res = util.commit(session, work) assert res is False @@ -66,7 +66,7 @@ def test_recorder_bad_execute(hass_recorder): mck1.to_native = to_native with pytest.raises(SQLAlchemyError), patch( - "homeassistant.components.recorder.time.sleep" + "homeassistant.components.recorder.core.time.sleep" ) as e_mock: util.execute((mck1,), to_native=True) @@ -148,7 +148,7 @@ async def test_last_run_was_recently_clean( "homeassistant.components.recorder.util.last_run_was_recently_clean", wraps=_last_run_was_recently_clean, ) as last_run_was_recently_clean_mock, patch( - "homeassistant.components.recorder.dt_util.utcnow", + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=thirty_min_future_time, ): hass = await async_test_home_assistant(None) diff --git a/tests/components/recorder/test_websocket_api.py b/tests/components/recorder/test_websocket_api.py index 08427e60911..fe197cb72e6 100644 --- a/tests/components/recorder/test_websocket_api.py +++ b/tests/components/recorder/test_websocket_api.py @@ -323,9 +323,10 @@ async def test_recorder_info_migration_queue_exhausted(hass, hass_ws_client): with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch( "homeassistant.components.recorder.Recorder.async_periodic_statistics" ), patch( - "homeassistant.components.recorder.create_engine", new=create_engine_test + "homeassistant.components.recorder.core.create_engine", + new=create_engine_test, ), patch.object( - recorder, "MAX_QUEUE_BACKLOG", 1 + recorder.core, "MAX_QUEUE_BACKLOG", 1 ), patch( "homeassistant.components.recorder.migration.migrate_schema", wraps=stalled_migration, @@ -384,7 +385,7 @@ async def test_backup_start_timeout( # Ensure there are no queued events await async_wait_recording_done(hass) - with patch.object(recorder, "DB_LOCK_TIMEOUT", 0): + with patch.object(recorder.core, "DB_LOCK_TIMEOUT", 0): try: await client.send_json({"id": 1, "type": "backup/start"}) response = await client.receive_json() diff --git a/tests/components/sensor/test_recorder.py b/tests/components/sensor/test_recorder.py index 58a991c5f53..b4abf4d9ec3 100644 --- a/tests/components/sensor/test_recorder.py +++ b/tests/components/sensor/test_recorder.py @@ -172,7 +172,9 @@ def test_compile_hourly_statistics_purged_state_changes( mean = min = max = float(hist["sensor.test1"][-1].state) # Purge all states from the database - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=four): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=four + ): hass.services.call("recorder", "purge", {"keep_days": 0}) hass.block_till_done() wait_recording_done(hass) @@ -2747,17 +2749,23 @@ def record_states(hass, zero, entity_id, attributes, seq=None): four = three + timedelta(seconds=10 * 5) states = {entity_id: []} - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=one + ): states[entity_id].append( set_state(entity_id, str(seq[0]), attributes=attributes) ) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=two + ): states[entity_id].append( set_state(entity_id, str(seq[1]), attributes=attributes) ) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=three + ): states[entity_id].append( set_state(entity_id, str(seq[2]), attributes=attributes) ) @@ -3339,35 +3347,53 @@ def record_meter_states(hass, zero, entity_id, _attributes, seq): attributes["last_reset"] = zero.isoformat() states = {entity_id: []} - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=zero): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=zero + ): states[entity_id].append(set_state(entity_id, seq[0], attributes=attributes)) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=one + ): states[entity_id].append(set_state(entity_id, seq[1], attributes=attributes)) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=two + ): states[entity_id].append(set_state(entity_id, seq[2], attributes=attributes)) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=three + ): states[entity_id].append(set_state(entity_id, seq[3], attributes=attributes)) attributes = dict(_attributes) if "last_reset" in _attributes: attributes["last_reset"] = four.isoformat() - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=four): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=four + ): states[entity_id].append(set_state(entity_id, seq[4], attributes=attributes)) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=five): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=five + ): states[entity_id].append(set_state(entity_id, seq[5], attributes=attributes)) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=six): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=six + ): states[entity_id].append(set_state(entity_id, seq[6], attributes=attributes)) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=seven): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=seven + ): states[entity_id].append(set_state(entity_id, seq[7], attributes=attributes)) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=eight): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=eight + ): states[entity_id].append(set_state(entity_id, seq[8], attributes=attributes)) return four, eight, states @@ -3386,7 +3412,9 @@ def record_meter_state(hass, zero, entity_id, attributes, seq): return hass.states.get(entity_id) states = {entity_id: []} - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=zero): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=zero + ): states[entity_id].append(set_state(entity_id, seq[0], attributes=attributes)) return states @@ -3410,13 +3438,19 @@ def record_states_partially_unavailable(hass, zero, entity_id, attributes): four = three + timedelta(seconds=15 * 5) states = {entity_id: []} - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=one + ): states[entity_id].append(set_state(entity_id, "10", attributes=attributes)) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=two + ): states[entity_id].append(set_state(entity_id, "25", attributes=attributes)) - with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three): + with patch( + "homeassistant.components.recorder.core.dt_util.utcnow", return_value=three + ): states[entity_id].append( set_state(entity_id, STATE_UNAVAILABLE, attributes=attributes) )