diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 32a6e9c24dc..db44baee06c 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -14,7 +14,7 @@ import time from typing import Any, TypeVar import async_timeout -from sqlalchemy import create_engine, event as sqlalchemy_event, exc, func, select +from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select from sqlalchemy.engine import Engine from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import scoped_session, sessionmaker @@ -62,17 +62,10 @@ from .db_schema import ( States, StatesMeta, Statistics, - StatisticsRuns, StatisticsShortTerm, ) from .executor import DBInterruptibleThreadPoolExecutor -from .models import ( - DatabaseEngine, - StatisticData, - StatisticMetaData, - UnsupportedDialect, - process_timestamp, -) +from .models import DatabaseEngine, StatisticData, StatisticMetaData, UnsupportedDialect from .pool import POOL_SIZE, MutexPool, RecorderPool from .queries import ( has_entity_ids_to_migrate, @@ -93,6 +86,7 @@ from .tasks import ( ChangeStatisticsUnitTask, ClearStatisticsTask, CommitTask, + CompileMissingStatisticsTask, DatabaseLockTask, EntityIDMigrationTask, EventsContextIDMigrationTask, @@ -680,9 +674,7 @@ class Recorder(threading.Thread): self._activate_and_set_db_ready() # Catch up with missed statistics - with session_scope(session=self.get_session()) as session: - self._schedule_compile_missing_statistics(session) - + self._schedule_compile_missing_statistics() _LOGGER.debug("Recorder processing the queue") self._adjust_lru_size() self.hass.add_job(self._async_set_recorder_ready_migration_done) @@ -1295,26 +1287,9 @@ class Recorder(threading.Thread): self._open_event_session() - def _schedule_compile_missing_statistics(self, session: Session) -> None: + def _schedule_compile_missing_statistics(self) -> None: """Add tasks for missing statistics runs.""" - now = dt_util.utcnow() - last_period_minutes = now.minute - now.minute % 5 - last_period = now.replace(minute=last_period_minutes, second=0, microsecond=0) - start = now - timedelta(days=self.keep_days) - start = start.replace(minute=0, second=0, microsecond=0) - - # Find the newest statistics run, if any - # https://github.com/sqlalchemy/sqlalchemy/issues/9189 - # pylint: disable-next=not-callable - if last_run := session.query(func.max(StatisticsRuns.start)).scalar(): - start = max(start, process_timestamp(last_run) + timedelta(minutes=5)) - - # Add tasks - while start < last_period: - end = start + timedelta(minutes=5) - _LOGGER.debug("Compiling missing statistics for %s-%s", start, end) - self.queue_task(StatisticsTask(start, end >= last_period)) - start = end + self.queue_task(CompileMissingStatisticsTask()) def _end_session(self) -> None: """End the recorder session.""" diff --git a/homeassistant/components/recorder/statistics.py b/homeassistant/components/recorder/statistics.py index 36874869bf9..fcd934270d1 100644 --- a/homeassistant/components/recorder/statistics.py +++ b/homeassistant/components/recorder/statistics.py @@ -72,6 +72,7 @@ from .models import ( StatisticMetaData, StatisticResult, datetime_to_timestamp_or_none, + process_timestamp, ) from .util import ( database_job_retry_wrapper, @@ -685,69 +686,125 @@ def _compile_hourly_statistics(session: Session, start: datetime) -> None: ) -@retryable_database_job("statistics") +@retryable_database_job("compile missing statistics") +def compile_missing_statistics(instance: Recorder) -> bool: + """Compile missing statistics.""" + now = dt_util.utcnow() + period_size = 5 + last_period_minutes = now.minute - now.minute % period_size + last_period = now.replace(minute=last_period_minutes, second=0, microsecond=0) + start = now - timedelta(days=instance.keep_days) + start = start.replace(minute=0, second=0, microsecond=0) + # Commit every 12 hours of data + commit_interval = 60 / period_size * 12 + + with session_scope( + session=instance.get_session(), + exception_filter=_filter_unique_constraint_integrity_error(instance), + ) as session: + # Find the newest statistics run, if any + # https://github.com/sqlalchemy/sqlalchemy/issues/9189 + # pylint: disable-next=not-callable + if last_run := session.query(func.max(StatisticsRuns.start)).scalar(): + start = max(start, process_timestamp(last_run) + timedelta(minutes=5)) + + periods_without_commit = 0 + while start < last_period: + periods_without_commit += 1 + end = start + timedelta(minutes=period_size) + _LOGGER.debug("Compiling missing statistics for %s-%s", start, end) + metadata_modified = _compile_statistics( + instance, session, start, end >= last_period + ) + if periods_without_commit == commit_interval or metadata_modified: + session.commit() + session.expunge_all() + periods_without_commit = 0 + start = end + + return True + + +@retryable_database_job("compile statistics") def compile_statistics(instance: Recorder, start: datetime, fire_events: bool) -> bool: """Compile 5-minute statistics for all integrations with a recorder platform. The actual calculation is delegated to the platforms. """ - start = dt_util.as_utc(start) - end = start + timedelta(minutes=5) - statistics_meta_manager = instance.statistics_meta_manager - # Return if we already have 5-minute statistics for the requested period with session_scope( session=instance.get_session(), exception_filter=_filter_unique_constraint_integrity_error(instance), ) as session: - if session.query(StatisticsRuns).filter_by(start=start).first(): - _LOGGER.debug("Statistics already compiled for %s-%s", start, end) - return True + _compile_statistics(instance, session, start, fire_events) + return True - _LOGGER.debug("Compiling statistics for %s-%s", start, end) - platform_stats: list[StatisticResult] = [] - current_metadata: dict[str, tuple[int, StatisticMetaData]] = {} - # Collect statistics from all platforms implementing support - for domain, platform in instance.hass.data[DOMAIN].recorder_platforms.items(): - if not hasattr(platform, "compile_statistics"): - continue - compiled: PlatformCompiledStatistics = platform.compile_statistics( - instance.hass, start, end - ) - _LOGGER.debug( - "Statistics for %s during %s-%s: %s", - domain, - start, - end, - compiled.platform_stats, - ) - platform_stats.extend(compiled.platform_stats) - current_metadata.update(compiled.current_metadata) - # Insert collected statistics in the database - for stats in platform_stats: - _, metadata_id = statistics_meta_manager.update_or_add( - session, stats["meta"], current_metadata - ) - _insert_statistics( - session, - StatisticsShortTerm, - metadata_id, - stats["stat"], - ) +def _compile_statistics( + instance: Recorder, session: Session, start: datetime, fire_events: bool +) -> bool: + """Compile 5-minute statistics for all integrations with a recorder platform. - if start.minute == 55: - # A full hour is ready, summarize it - _compile_hourly_statistics(session, start) + This is a helper function for compile_statistics and compile_missing_statistics + that does not retry on database errors since both callers already retry. - session.add(StatisticsRuns(start=start)) + returns True if metadata was modified, False otherwise + """ + assert start.tzinfo == dt_util.UTC, "start must be in UTC" + end = start + timedelta(minutes=5) + statistics_meta_manager = instance.statistics_meta_manager + metadata_modified = False + + # Return if we already have 5-minute statistics for the requested period + if session.query(StatisticsRuns).filter_by(start=start).first(): + _LOGGER.debug("Statistics already compiled for %s-%s", start, end) + return metadata_modified + + _LOGGER.debug("Compiling statistics for %s-%s", start, end) + platform_stats: list[StatisticResult] = [] + current_metadata: dict[str, tuple[int, StatisticMetaData]] = {} + # Collect statistics from all platforms implementing support + for domain, platform in instance.hass.data[DOMAIN].recorder_platforms.items(): + if not hasattr(platform, "compile_statistics"): + continue + compiled: PlatformCompiledStatistics = platform.compile_statistics( + instance.hass, start, end + ) + _LOGGER.debug( + "Statistics for %s during %s-%s: %s", + domain, + start, + end, + compiled.platform_stats, + ) + platform_stats.extend(compiled.platform_stats) + current_metadata.update(compiled.current_metadata) + + # Insert collected statistics in the database + for stats in platform_stats: + updated, metadata_id = statistics_meta_manager.update_or_add( + session, stats["meta"], current_metadata + ) + metadata_modified |= updated + _insert_statistics( + session, + StatisticsShortTerm, + metadata_id, + stats["stat"], + ) + + if start.minute == 55: + # A full hour is ready, summarize it + _compile_hourly_statistics(session, start) + + session.add(StatisticsRuns(start=start)) if fire_events: instance.hass.bus.fire(EVENT_RECORDER_5MIN_STATISTICS_GENERATED) if start.minute == 55: instance.hass.bus.fire(EVENT_RECORDER_HOURLY_STATISTICS_GENERATED) - return True + return metadata_modified def _adjust_sum_statistics( diff --git a/homeassistant/components/recorder/tasks.py b/homeassistant/components/recorder/tasks.py index f2ba42bdea7..5762a9ab69c 100644 --- a/homeassistant/components/recorder/tasks.py +++ b/homeassistant/components/recorder/tasks.py @@ -151,6 +151,18 @@ class StatisticsTask(RecorderTask): instance.queue_task(StatisticsTask(self.start, self.fire_events)) +@dataclass +class CompileMissingStatisticsTask(RecorderTask): + """An object to insert into the recorder queue to run a compile missing statistics.""" + + def run(self, instance: Recorder) -> None: + """Run statistics task to compile missing statistics.""" + if statistics.compile_missing_statistics(instance): + return + # Schedule a new statistics task if this one didn't finish + instance.queue_task(CompileMissingStatisticsTask()) + + @dataclass class ImportStatisticsTask(RecorderTask): """An object to insert into the recorder queue to run an import statistics task."""