diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index e6c15729d24..897a4eb3c94 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -11,7 +11,7 @@ import threading import time from typing import Any, Callable, NamedTuple -from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select +from sqlalchemy import create_engine, event as sqlalchemy_event, exc, func, select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.pool import StaticPool @@ -50,7 +50,14 @@ import homeassistant.util.dt as dt_util from . import history, migration, purge, statistics from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, DOMAIN, SQLITE_URL_PREFIX -from .models import Base, Events, RecorderRuns, States +from .models import ( + Base, + Events, + RecorderRuns, + States, + StatisticsRuns, + process_timestamp, +) from .pool import RecorderPool from .util import ( dburl_to_path, @@ -567,11 +574,35 @@ class Recorder(threading.Thread): async_track_time_change( self.hass, self.async_nightly_tasks, hour=4, minute=12, second=0 ) + # Compile hourly statistics every hour at *:12 async_track_time_change( self.hass, self.async_hourly_statistics, minute=12, second=0 ) + # Add tasks for missing statistics runs + now = dt_util.utcnow() + last_hour = now.replace(minute=0, second=0, microsecond=0) + start = now - timedelta(days=self.keep_days) + start = start.replace(minute=0, second=0, microsecond=0) + + if not self.get_session: + # Home Assistant is shutting down + return + + # Find the newest statistics run, if any + with session_scope(session=self.get_session()) as session: + last_run = session.query(func.max(StatisticsRuns.start)).scalar() + if last_run: + start = max(start, process_timestamp(last_run) + timedelta(hours=1)) + + # Add tasks + while start < last_hour: + end = start + timedelta(hours=1) + _LOGGER.debug("Compiling missing statistics for %s-%s", start, end) + self.queue.put(StatisticsTask(start)) + start = start + timedelta(hours=1) + def run(self): """Start processing events to save.""" shutdown_task = object() @@ -606,7 +637,7 @@ class Recorder(threading.Thread): if not schema_is_current: if self._migrate_schema_and_setup_run(current_version): if not self._event_listener: - # If the schema migration takes so longer that the end + # If the schema migration takes so long that the end # queue watcher safety kicks in because MAX_QUEUE_BACKLOG # is reached, we need to reinitialize the listener. self.hass.add_job(self.async_initialize) diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 06391f2864d..211e1646cca 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -1,4 +1,5 @@ """Schema migration helpers.""" +from datetime import timedelta import logging import sqlalchemy @@ -11,6 +12,8 @@ from sqlalchemy.exc import ( ) from sqlalchemy.schema import AddConstraint, DropConstraint +import homeassistant.util.dt as dt_util + from .models import ( SCHEMA_VERSION, TABLE_STATES, @@ -18,6 +21,7 @@ from .models import ( SchemaChanges, Statistics, StatisticsMeta, + StatisticsRuns, ) from .util import session_scope @@ -475,6 +479,13 @@ def _apply_update(engine, session, new_version, old_version): StatisticsMeta.__table__.create(engine) Statistics.__table__.create(engine) + elif new_version == 19: + # This adds the statistic runs table, insert a fake run to prevent duplicating + # statistics. + now = dt_util.utcnow() + start = now.replace(minute=0, second=0, microsecond=0) + start = start - timedelta(hours=1) + session.add(StatisticsRuns(start=start)) else: raise ValueError(f"No schema migration defined for version {new_version}") @@ -494,6 +505,10 @@ def _inspect_schema_version(engine, session): for index in indexes: if index["column_names"] == ["time_fired"]: # Schema addition from version 1 detected. New DB. + now = dt_util.utcnow() + start = now.replace(minute=0, second=0, microsecond=0) + start = start - timedelta(hours=1) + session.add(StatisticsRuns(start=start)) session.add(SchemaChanges(schema_version=SCHEMA_VERSION)) return SCHEMA_VERSION diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py index fe75ba1cb50..1c56e9c8f79 100644 --- a/homeassistant/components/recorder/models.py +++ b/homeassistant/components/recorder/models.py @@ -39,7 +39,7 @@ import homeassistant.util.dt as dt_util # pylint: disable=invalid-name Base = declarative_base() -SCHEMA_VERSION = 18 +SCHEMA_VERSION = 19 _LOGGER = logging.getLogger(__name__) @@ -51,6 +51,7 @@ TABLE_RECORDER_RUNS = "recorder_runs" TABLE_SCHEMA_CHANGES = "schema_changes" TABLE_STATISTICS = "statistics" TABLE_STATISTICS_META = "statistics_meta" +TABLE_STATISTICS_RUNS = "statistics_runs" ALL_TABLES = [ TABLE_STATES, @@ -59,6 +60,7 @@ ALL_TABLES = [ TABLE_SCHEMA_CHANGES, TABLE_STATISTICS, TABLE_STATISTICS_META, + TABLE_STATISTICS_RUNS, ] DATETIME_TYPE = DateTime(timezone=True).with_variant( @@ -110,7 +112,7 @@ class Events(Base): # type: ignore ) def to_native(self, validate_entity_id=True): - """Convert to a natve HA Event.""" + """Convert to a native HA Event.""" context = Context( id=self.context_id, user_id=self.context_user_id, @@ -359,6 +361,22 @@ class SchemaChanges(Base): # type: ignore ) +class StatisticsRuns(Base): # type: ignore + """Representation of statistics run.""" + + __tablename__ = TABLE_STATISTICS_RUNS + run_id = Column(Integer, primary_key=True) + start = Column(DateTime(timezone=True)) + + def __repr__(self) -> str: + """Return string representation of instance for debugging.""" + return ( + f"" + ) + + def process_timestamp(ts): """Process a timestamp into datetime object.""" if ts is None: diff --git a/homeassistant/components/recorder/statistics.py b/homeassistant/components/recorder/statistics.py index 6017f050419..f8a9e3a6c89 100644 --- a/homeassistant/components/recorder/statistics.py +++ b/homeassistant/components/recorder/statistics.py @@ -30,6 +30,7 @@ from .models import ( StatisticMetaData, Statistics, StatisticsMeta, + StatisticsRuns, process_timestamp_to_utc_isoformat, ) from .util import execute, retryable_database_job, session_scope @@ -156,6 +157,12 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool: """Compile statistics.""" start = dt_util.as_utc(start) end = start + timedelta(hours=1) + + with session_scope(session=instance.get_session()) as session: # type: ignore + if session.query(StatisticsRuns).filter_by(start=start).first(): + _LOGGER.debug("Statistics already compiled for %s-%s", start, end) + return True + _LOGGER.debug("Compiling statistics for %s-%s", start, end) platform_stats = [] for domain, platform in instance.hass.data[DOMAIN].items(): @@ -173,6 +180,7 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool: instance.hass, session, entity_id, stat["meta"] ) session.add(Statistics.from_stats(metadata_id, start, stat["stat"])) + session.add(StatisticsRuns(start=start)) return True diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index e3af39b217a..f492b754125 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -24,6 +24,7 @@ from .models import ( TABLE_SCHEMA_CHANGES, TABLE_STATISTICS, TABLE_STATISTICS_META, + TABLE_STATISTICS_RUNS, RecorderRuns, process_timestamp, ) @@ -183,7 +184,8 @@ def basic_sanity_check(cursor): """Check tables to make sure select does not fail.""" for table in ALL_TABLES: - if table in [TABLE_STATISTICS, TABLE_STATISTICS_META]: + # The statistics tables may not be present in old databases + if table in [TABLE_STATISTICS, TABLE_STATISTICS_META, TABLE_STATISTICS_RUNS]: continue if table in (TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES): cursor.execute(f"SELECT * FROM {table};") # nosec # not injection diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 195e56dc748..fa0e8b7349b 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -25,7 +25,13 @@ from homeassistant.components.recorder import ( run_information_with_session, ) from homeassistant.components.recorder.const import DATA_INSTANCE -from homeassistant.components.recorder.models import Events, RecorderRuns, States +from homeassistant.components.recorder.models import ( + Events, + RecorderRuns, + States, + StatisticsRuns, + process_timestamp, +) from homeassistant.components.recorder.util import session_scope from homeassistant.const import ( EVENT_HOMEASSISTANT_FINAL_WRITE, @@ -735,6 +741,69 @@ def test_auto_statistics(hass_recorder): dt_util.set_default_time_zone(original_tz) +def test_statistics_runs_initiated(hass_recorder): + """Test statistics_runs is initiated when DB is created.""" + now = dt_util.utcnow() + with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=now): + hass = hass_recorder() + + wait_recording_done(hass) + + with session_scope(hass=hass) as session: + statistics_runs = list(session.query(StatisticsRuns)) + assert len(statistics_runs) == 1 + last_run = process_timestamp(statistics_runs[0].start) + assert process_timestamp(last_run) == now.replace( + minute=0, second=0, microsecond=0 + ) - timedelta(hours=1) + + +def test_compile_missing_statistics(tmpdir): + """Test missing statistics are compiled on startup.""" + now = dt_util.utcnow().replace(minute=0, second=0, microsecond=0) + test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db") + dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}" + + with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=now): + + hass = get_test_home_assistant() + setup_component(hass, DOMAIN, {DOMAIN: {CONF_DB_URL: dburl}}) + hass.start() + wait_recording_done(hass) + wait_recording_done(hass) + + with session_scope(hass=hass) as session: + statistics_runs = list(session.query(StatisticsRuns)) + assert len(statistics_runs) == 1 + last_run = process_timestamp(statistics_runs[0].start) + assert last_run == now - timedelta(hours=1) + + wait_recording_done(hass) + wait_recording_done(hass) + hass.stop() + + with patch( + "homeassistant.components.recorder.dt_util.utcnow", + return_value=now + timedelta(hours=1), + ): + + hass = get_test_home_assistant() + setup_component(hass, DOMAIN, {DOMAIN: {CONF_DB_URL: dburl}}) + hass.start() + wait_recording_done(hass) + wait_recording_done(hass) + + with session_scope(hass=hass) as session: + statistics_runs = list(session.query(StatisticsRuns)) + assert len(statistics_runs) == 2 + last_run = process_timestamp(statistics_runs[1].start) + assert last_run == now + + wait_recording_done(hass) + wait_recording_done(hass) + hass.stop() + + def test_saving_sets_old_state(hass_recorder): """Test saving sets old state.""" hass = hass_recorder() diff --git a/tests/components/recorder/test_statistics.py b/tests/components/recorder/test_statistics.py index 83995b0c0ac..995ad537ab4 100644 --- a/tests/components/recorder/test_statistics.py +++ b/tests/components/recorder/test_statistics.py @@ -148,6 +148,39 @@ def test_rename_entity(hass_recorder): assert stats == {"sensor.test99": expected_stats99, "sensor.test2": expected_stats2} +def test_statistics_duplicated(hass_recorder, caplog): + """Test statistics with same start time is not compiled.""" + hass = hass_recorder() + recorder = hass.data[DATA_INSTANCE] + setup_component(hass, "sensor", {}) + zero, four, states = record_states(hass) + hist = history.get_significant_states(hass, zero, four) + assert dict(states) == dict(hist) + + wait_recording_done(hass) + assert "Compiling statistics for" not in caplog.text + assert "Statistics already compiled" not in caplog.text + + with patch( + "homeassistant.components.sensor.recorder.compile_statistics" + ) as compile_statistics: + recorder.do_adhoc_statistics(period="hourly", start=zero) + wait_recording_done(hass) + assert compile_statistics.called + compile_statistics.reset_mock() + assert "Compiling statistics for" in caplog.text + assert "Statistics already compiled" not in caplog.text + caplog.clear() + + recorder.do_adhoc_statistics(period="hourly", start=zero) + wait_recording_done(hass) + assert not compile_statistics.called + compile_statistics.reset_mock() + assert "Compiling statistics for" not in caplog.text + assert "Statistics already compiled" in caplog.text + caplog.clear() + + def record_states(hass): """Record some test states.