Compile missing statistics (#54690)

This commit is contained in:
Erik Montnemery 2021-08-20 07:10:45 +02:00 committed by GitHub
parent 036e99e91e
commit 32e297f4a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 183 additions and 7 deletions

View File

@ -11,7 +11,7 @@ import threading
import time
from typing import Any, Callable, NamedTuple
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, func, select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import StaticPool
@ -50,7 +50,14 @@ import homeassistant.util.dt as dt_util
from . import history, migration, purge, statistics
from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, DOMAIN, SQLITE_URL_PREFIX
from .models import Base, Events, RecorderRuns, States
from .models import (
Base,
Events,
RecorderRuns,
States,
StatisticsRuns,
process_timestamp,
)
from .pool import RecorderPool
from .util import (
dburl_to_path,
@ -567,11 +574,35 @@ class Recorder(threading.Thread):
async_track_time_change(
self.hass, self.async_nightly_tasks, hour=4, minute=12, second=0
)
# Compile hourly statistics every hour at *:12
async_track_time_change(
self.hass, self.async_hourly_statistics, minute=12, second=0
)
# Add tasks for missing statistics runs
now = dt_util.utcnow()
last_hour = now.replace(minute=0, second=0, microsecond=0)
start = now - timedelta(days=self.keep_days)
start = start.replace(minute=0, second=0, microsecond=0)
if not self.get_session:
# Home Assistant is shutting down
return
# Find the newest statistics run, if any
with session_scope(session=self.get_session()) as session:
last_run = session.query(func.max(StatisticsRuns.start)).scalar()
if last_run:
start = max(start, process_timestamp(last_run) + timedelta(hours=1))
# Add tasks
while start < last_hour:
end = start + timedelta(hours=1)
_LOGGER.debug("Compiling missing statistics for %s-%s", start, end)
self.queue.put(StatisticsTask(start))
start = start + timedelta(hours=1)
def run(self):
"""Start processing events to save."""
shutdown_task = object()
@ -606,7 +637,7 @@ class Recorder(threading.Thread):
if not schema_is_current:
if self._migrate_schema_and_setup_run(current_version):
if not self._event_listener:
# If the schema migration takes so longer that the end
# If the schema migration takes so long that the end
# queue watcher safety kicks in because MAX_QUEUE_BACKLOG
# is reached, we need to reinitialize the listener.
self.hass.add_job(self.async_initialize)

View File

@ -1,4 +1,5 @@
"""Schema migration helpers."""
from datetime import timedelta
import logging
import sqlalchemy
@ -11,6 +12,8 @@ from sqlalchemy.exc import (
)
from sqlalchemy.schema import AddConstraint, DropConstraint
import homeassistant.util.dt as dt_util
from .models import (
SCHEMA_VERSION,
TABLE_STATES,
@ -18,6 +21,7 @@ from .models import (
SchemaChanges,
Statistics,
StatisticsMeta,
StatisticsRuns,
)
from .util import session_scope
@ -475,6 +479,13 @@ def _apply_update(engine, session, new_version, old_version):
StatisticsMeta.__table__.create(engine)
Statistics.__table__.create(engine)
elif new_version == 19:
# This adds the statistic runs table, insert a fake run to prevent duplicating
# statistics.
now = dt_util.utcnow()
start = now.replace(minute=0, second=0, microsecond=0)
start = start - timedelta(hours=1)
session.add(StatisticsRuns(start=start))
else:
raise ValueError(f"No schema migration defined for version {new_version}")
@ -494,6 +505,10 @@ def _inspect_schema_version(engine, session):
for index in indexes:
if index["column_names"] == ["time_fired"]:
# Schema addition from version 1 detected. New DB.
now = dt_util.utcnow()
start = now.replace(minute=0, second=0, microsecond=0)
start = start - timedelta(hours=1)
session.add(StatisticsRuns(start=start))
session.add(SchemaChanges(schema_version=SCHEMA_VERSION))
return SCHEMA_VERSION

View File

@ -39,7 +39,7 @@ import homeassistant.util.dt as dt_util
# pylint: disable=invalid-name
Base = declarative_base()
SCHEMA_VERSION = 18
SCHEMA_VERSION = 19
_LOGGER = logging.getLogger(__name__)
@ -51,6 +51,7 @@ TABLE_RECORDER_RUNS = "recorder_runs"
TABLE_SCHEMA_CHANGES = "schema_changes"
TABLE_STATISTICS = "statistics"
TABLE_STATISTICS_META = "statistics_meta"
TABLE_STATISTICS_RUNS = "statistics_runs"
ALL_TABLES = [
TABLE_STATES,
@ -59,6 +60,7 @@ ALL_TABLES = [
TABLE_SCHEMA_CHANGES,
TABLE_STATISTICS,
TABLE_STATISTICS_META,
TABLE_STATISTICS_RUNS,
]
DATETIME_TYPE = DateTime(timezone=True).with_variant(
@ -110,7 +112,7 @@ class Events(Base): # type: ignore
)
def to_native(self, validate_entity_id=True):
"""Convert to a natve HA Event."""
"""Convert to a native HA Event."""
context = Context(
id=self.context_id,
user_id=self.context_user_id,
@ -359,6 +361,22 @@ class SchemaChanges(Base): # type: ignore
)
class StatisticsRuns(Base): # type: ignore
"""Representation of statistics run."""
__tablename__ = TABLE_STATISTICS_RUNS
run_id = Column(Integer, primary_key=True)
start = Column(DateTime(timezone=True))
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.StatisticsRuns("
f"id={self.run_id}, start='{self.start.isoformat(sep=' ', timespec='seconds')}', "
f")>"
)
def process_timestamp(ts):
"""Process a timestamp into datetime object."""
if ts is None:

View File

@ -30,6 +30,7 @@ from .models import (
StatisticMetaData,
Statistics,
StatisticsMeta,
StatisticsRuns,
process_timestamp_to_utc_isoformat,
)
from .util import execute, retryable_database_job, session_scope
@ -156,6 +157,12 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool:
"""Compile statistics."""
start = dt_util.as_utc(start)
end = start + timedelta(hours=1)
with session_scope(session=instance.get_session()) as session: # type: ignore
if session.query(StatisticsRuns).filter_by(start=start).first():
_LOGGER.debug("Statistics already compiled for %s-%s", start, end)
return True
_LOGGER.debug("Compiling statistics for %s-%s", start, end)
platform_stats = []
for domain, platform in instance.hass.data[DOMAIN].items():
@ -173,6 +180,7 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool:
instance.hass, session, entity_id, stat["meta"]
)
session.add(Statistics.from_stats(metadata_id, start, stat["stat"]))
session.add(StatisticsRuns(start=start))
return True

View File

@ -24,6 +24,7 @@ from .models import (
TABLE_SCHEMA_CHANGES,
TABLE_STATISTICS,
TABLE_STATISTICS_META,
TABLE_STATISTICS_RUNS,
RecorderRuns,
process_timestamp,
)
@ -183,7 +184,8 @@ def basic_sanity_check(cursor):
"""Check tables to make sure select does not fail."""
for table in ALL_TABLES:
if table in [TABLE_STATISTICS, TABLE_STATISTICS_META]:
# The statistics tables may not be present in old databases
if table in [TABLE_STATISTICS, TABLE_STATISTICS_META, TABLE_STATISTICS_RUNS]:
continue
if table in (TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES):
cursor.execute(f"SELECT * FROM {table};") # nosec # not injection

View File

@ -25,7 +25,13 @@ from homeassistant.components.recorder import (
run_information_with_session,
)
from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.components.recorder.models import Events, RecorderRuns, States
from homeassistant.components.recorder.models import (
Events,
RecorderRuns,
States,
StatisticsRuns,
process_timestamp,
)
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import (
EVENT_HOMEASSISTANT_FINAL_WRITE,
@ -735,6 +741,69 @@ def test_auto_statistics(hass_recorder):
dt_util.set_default_time_zone(original_tz)
def test_statistics_runs_initiated(hass_recorder):
"""Test statistics_runs is initiated when DB is created."""
now = dt_util.utcnow()
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=now):
hass = hass_recorder()
wait_recording_done(hass)
with session_scope(hass=hass) as session:
statistics_runs = list(session.query(StatisticsRuns))
assert len(statistics_runs) == 1
last_run = process_timestamp(statistics_runs[0].start)
assert process_timestamp(last_run) == now.replace(
minute=0, second=0, microsecond=0
) - timedelta(hours=1)
def test_compile_missing_statistics(tmpdir):
"""Test missing statistics are compiled on startup."""
now = dt_util.utcnow().replace(minute=0, second=0, microsecond=0)
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=now):
hass = get_test_home_assistant()
setup_component(hass, DOMAIN, {DOMAIN: {CONF_DB_URL: dburl}})
hass.start()
wait_recording_done(hass)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
statistics_runs = list(session.query(StatisticsRuns))
assert len(statistics_runs) == 1
last_run = process_timestamp(statistics_runs[0].start)
assert last_run == now - timedelta(hours=1)
wait_recording_done(hass)
wait_recording_done(hass)
hass.stop()
with patch(
"homeassistant.components.recorder.dt_util.utcnow",
return_value=now + timedelta(hours=1),
):
hass = get_test_home_assistant()
setup_component(hass, DOMAIN, {DOMAIN: {CONF_DB_URL: dburl}})
hass.start()
wait_recording_done(hass)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
statistics_runs = list(session.query(StatisticsRuns))
assert len(statistics_runs) == 2
last_run = process_timestamp(statistics_runs[1].start)
assert last_run == now
wait_recording_done(hass)
wait_recording_done(hass)
hass.stop()
def test_saving_sets_old_state(hass_recorder):
"""Test saving sets old state."""
hass = hass_recorder()

View File

@ -148,6 +148,39 @@ def test_rename_entity(hass_recorder):
assert stats == {"sensor.test99": expected_stats99, "sensor.test2": expected_stats2}
def test_statistics_duplicated(hass_recorder, caplog):
"""Test statistics with same start time is not compiled."""
hass = hass_recorder()
recorder = hass.data[DATA_INSTANCE]
setup_component(hass, "sensor", {})
zero, four, states = record_states(hass)
hist = history.get_significant_states(hass, zero, four)
assert dict(states) == dict(hist)
wait_recording_done(hass)
assert "Compiling statistics for" not in caplog.text
assert "Statistics already compiled" not in caplog.text
with patch(
"homeassistant.components.sensor.recorder.compile_statistics"
) as compile_statistics:
recorder.do_adhoc_statistics(period="hourly", start=zero)
wait_recording_done(hass)
assert compile_statistics.called
compile_statistics.reset_mock()
assert "Compiling statistics for" in caplog.text
assert "Statistics already compiled" not in caplog.text
caplog.clear()
recorder.do_adhoc_statistics(period="hourly", start=zero)
wait_recording_done(hass)
assert not compile_statistics.called
compile_statistics.reset_mock()
assert "Compiling statistics for" not in caplog.text
assert "Statistics already compiled" in caplog.text
caplog.clear()
def record_states(hass):
"""Record some test states.