Reduce number of tasks created by compiling missing statistics (#89835)

This commit is contained in:
J. Nick Koston 2023-03-16 21:07:14 -10:00 committed by GitHub
parent d671d7fc1f
commit dbb2706c76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 118 additions and 74 deletions

View File

@ -14,7 +14,7 @@ import time
from typing import Any, TypeVar
import async_timeout
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, func, select
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import scoped_session, sessionmaker
@ -62,17 +62,10 @@ from .db_schema import (
States,
StatesMeta,
Statistics,
StatisticsRuns,
StatisticsShortTerm,
)
from .executor import DBInterruptibleThreadPoolExecutor
from .models import (
DatabaseEngine,
StatisticData,
StatisticMetaData,
UnsupportedDialect,
process_timestamp,
)
from .models import DatabaseEngine, StatisticData, StatisticMetaData, UnsupportedDialect
from .pool import POOL_SIZE, MutexPool, RecorderPool
from .queries import (
has_entity_ids_to_migrate,
@ -93,6 +86,7 @@ from .tasks import (
ChangeStatisticsUnitTask,
ClearStatisticsTask,
CommitTask,
CompileMissingStatisticsTask,
DatabaseLockTask,
EntityIDMigrationTask,
EventsContextIDMigrationTask,
@ -680,9 +674,7 @@ class Recorder(threading.Thread):
self._activate_and_set_db_ready()
# Catch up with missed statistics
with session_scope(session=self.get_session()) as session:
self._schedule_compile_missing_statistics(session)
self._schedule_compile_missing_statistics()
_LOGGER.debug("Recorder processing the queue")
self._adjust_lru_size()
self.hass.add_job(self._async_set_recorder_ready_migration_done)
@ -1295,26 +1287,9 @@ class Recorder(threading.Thread):
self._open_event_session()
def _schedule_compile_missing_statistics(self, session: Session) -> None:
def _schedule_compile_missing_statistics(self) -> None:
"""Add tasks for missing statistics runs."""
now = dt_util.utcnow()
last_period_minutes = now.minute - now.minute % 5
last_period = now.replace(minute=last_period_minutes, second=0, microsecond=0)
start = now - timedelta(days=self.keep_days)
start = start.replace(minute=0, second=0, microsecond=0)
# Find the newest statistics run, if any
# https://github.com/sqlalchemy/sqlalchemy/issues/9189
# pylint: disable-next=not-callable
if last_run := session.query(func.max(StatisticsRuns.start)).scalar():
start = max(start, process_timestamp(last_run) + timedelta(minutes=5))
# Add tasks
while start < last_period:
end = start + timedelta(minutes=5)
_LOGGER.debug("Compiling missing statistics for %s-%s", start, end)
self.queue_task(StatisticsTask(start, end >= last_period))
start = end
self.queue_task(CompileMissingStatisticsTask())
def _end_session(self) -> None:
"""End the recorder session."""

View File

@ -72,6 +72,7 @@ from .models import (
StatisticMetaData,
StatisticResult,
datetime_to_timestamp_or_none,
process_timestamp,
)
from .util import (
database_job_retry_wrapper,
@ -685,69 +686,125 @@ def _compile_hourly_statistics(session: Session, start: datetime) -> None:
)
@retryable_database_job("statistics")
@retryable_database_job("compile missing statistics")
def compile_missing_statistics(instance: Recorder) -> bool:
"""Compile missing statistics."""
now = dt_util.utcnow()
period_size = 5
last_period_minutes = now.minute - now.minute % period_size
last_period = now.replace(minute=last_period_minutes, second=0, microsecond=0)
start = now - timedelta(days=instance.keep_days)
start = start.replace(minute=0, second=0, microsecond=0)
# Commit every 12 hours of data
commit_interval = 60 / period_size * 12
with session_scope(
session=instance.get_session(),
exception_filter=_filter_unique_constraint_integrity_error(instance),
) as session:
# Find the newest statistics run, if any
# https://github.com/sqlalchemy/sqlalchemy/issues/9189
# pylint: disable-next=not-callable
if last_run := session.query(func.max(StatisticsRuns.start)).scalar():
start = max(start, process_timestamp(last_run) + timedelta(minutes=5))
periods_without_commit = 0
while start < last_period:
periods_without_commit += 1
end = start + timedelta(minutes=period_size)
_LOGGER.debug("Compiling missing statistics for %s-%s", start, end)
metadata_modified = _compile_statistics(
instance, session, start, end >= last_period
)
if periods_without_commit == commit_interval or metadata_modified:
session.commit()
session.expunge_all()
periods_without_commit = 0
start = end
return True
@retryable_database_job("compile statistics")
def compile_statistics(instance: Recorder, start: datetime, fire_events: bool) -> bool:
"""Compile 5-minute statistics for all integrations with a recorder platform.
The actual calculation is delegated to the platforms.
"""
start = dt_util.as_utc(start)
end = start + timedelta(minutes=5)
statistics_meta_manager = instance.statistics_meta_manager
# Return if we already have 5-minute statistics for the requested period
with session_scope(
session=instance.get_session(),
exception_filter=_filter_unique_constraint_integrity_error(instance),
) as session:
if session.query(StatisticsRuns).filter_by(start=start).first():
_LOGGER.debug("Statistics already compiled for %s-%s", start, end)
return True
_compile_statistics(instance, session, start, fire_events)
return True
_LOGGER.debug("Compiling statistics for %s-%s", start, end)
platform_stats: list[StatisticResult] = []
current_metadata: dict[str, tuple[int, StatisticMetaData]] = {}
# Collect statistics from all platforms implementing support
for domain, platform in instance.hass.data[DOMAIN].recorder_platforms.items():
if not hasattr(platform, "compile_statistics"):
continue
compiled: PlatformCompiledStatistics = platform.compile_statistics(
instance.hass, start, end
)
_LOGGER.debug(
"Statistics for %s during %s-%s: %s",
domain,
start,
end,
compiled.platform_stats,
)
platform_stats.extend(compiled.platform_stats)
current_metadata.update(compiled.current_metadata)
# Insert collected statistics in the database
for stats in platform_stats:
_, metadata_id = statistics_meta_manager.update_or_add(
session, stats["meta"], current_metadata
)
_insert_statistics(
session,
StatisticsShortTerm,
metadata_id,
stats["stat"],
)
def _compile_statistics(
instance: Recorder, session: Session, start: datetime, fire_events: bool
) -> bool:
"""Compile 5-minute statistics for all integrations with a recorder platform.
if start.minute == 55:
# A full hour is ready, summarize it
_compile_hourly_statistics(session, start)
This is a helper function for compile_statistics and compile_missing_statistics
that does not retry on database errors since both callers already retry.
session.add(StatisticsRuns(start=start))
returns True if metadata was modified, False otherwise
"""
assert start.tzinfo == dt_util.UTC, "start must be in UTC"
end = start + timedelta(minutes=5)
statistics_meta_manager = instance.statistics_meta_manager
metadata_modified = False
# Return if we already have 5-minute statistics for the requested period
if session.query(StatisticsRuns).filter_by(start=start).first():
_LOGGER.debug("Statistics already compiled for %s-%s", start, end)
return metadata_modified
_LOGGER.debug("Compiling statistics for %s-%s", start, end)
platform_stats: list[StatisticResult] = []
current_metadata: dict[str, tuple[int, StatisticMetaData]] = {}
# Collect statistics from all platforms implementing support
for domain, platform in instance.hass.data[DOMAIN].recorder_platforms.items():
if not hasattr(platform, "compile_statistics"):
continue
compiled: PlatformCompiledStatistics = platform.compile_statistics(
instance.hass, start, end
)
_LOGGER.debug(
"Statistics for %s during %s-%s: %s",
domain,
start,
end,
compiled.platform_stats,
)
platform_stats.extend(compiled.platform_stats)
current_metadata.update(compiled.current_metadata)
# Insert collected statistics in the database
for stats in platform_stats:
updated, metadata_id = statistics_meta_manager.update_or_add(
session, stats["meta"], current_metadata
)
metadata_modified |= updated
_insert_statistics(
session,
StatisticsShortTerm,
metadata_id,
stats["stat"],
)
if start.minute == 55:
# A full hour is ready, summarize it
_compile_hourly_statistics(session, start)
session.add(StatisticsRuns(start=start))
if fire_events:
instance.hass.bus.fire(EVENT_RECORDER_5MIN_STATISTICS_GENERATED)
if start.minute == 55:
instance.hass.bus.fire(EVENT_RECORDER_HOURLY_STATISTICS_GENERATED)
return True
return metadata_modified
def _adjust_sum_statistics(

View File

@ -151,6 +151,18 @@ class StatisticsTask(RecorderTask):
instance.queue_task(StatisticsTask(self.start, self.fire_events))
@dataclass
class CompileMissingStatisticsTask(RecorderTask):
"""An object to insert into the recorder queue to run a compile missing statistics."""
def run(self, instance: Recorder) -> None:
"""Run statistics task to compile missing statistics."""
if statistics.compile_missing_statistics(instance):
return
# Schedule a new statistics task if this one didn't finish
instance.queue_task(CompileMissingStatisticsTask())
@dataclass
class ImportStatisticsTask(RecorderTask):
"""An object to insert into the recorder queue to run an import statistics task."""