From 64393b462d5213f02e218f27de3f9322586919f1 Mon Sep 17 00:00:00 2001 From: Erik Montnemery Date: Fri, 24 Sep 2021 09:19:22 +0200 Subject: [PATCH] Add migration for 5-minute statistics (#56585) * Add migration for 5-minute statistics * Tweaks --- .../components/recorder/migration.py | 47 +++++++++++++++++-- tests/components/recorder/test_migrate.py | 2 +- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 3ceac8903a7..1a1f978be59 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -1,9 +1,10 @@ """Schema migration helpers.""" import contextlib +from datetime import timedelta import logging import sqlalchemy -from sqlalchemy import ForeignKeyConstraint, MetaData, Table, text +from sqlalchemy import ForeignKeyConstraint, MetaData, Table, func, text from sqlalchemy.exc import ( InternalError, OperationalError, @@ -21,8 +22,9 @@ from .models import ( StatisticsMeta, StatisticsRuns, StatisticsShortTerm, + process_timestamp, ) -from .statistics import get_start_time +from .statistics import _get_metadata, get_start_time from .util import session_scope _LOGGER = logging.getLogger(__name__) @@ -72,7 +74,7 @@ def migrate_schema(instance, current_version): for version in range(current_version, SCHEMA_VERSION): new_version = version + 1 _LOGGER.info("Upgrading recorder db schema to version %s", new_version) - _apply_update(instance.engine, session, new_version, current_version) + _apply_update(instance, session, new_version, current_version) session.add(SchemaChanges(schema_version=new_version)) _LOGGER.info("Upgrade to version %s done", new_version) @@ -351,8 +353,9 @@ def _drop_foreign_key_constraints(connection, engine, table, columns): ) -def _apply_update(engine, session, new_version, old_version): # noqa: C901 +def _apply_update(instance, session, new_version, old_version): # noqa: C901 """Perform operations to bring schema up to date.""" + engine = instance.engine connection = session.connection() if new_version == 1: _create_index(connection, "events", "ix_events_time_fired") @@ -543,6 +546,42 @@ def _apply_update(engine, session, new_version, old_version): # noqa: C901 StatisticsMeta.__table__.create(engine) StatisticsShortTerm.__table__.create(engine) Statistics.__table__.create(engine) + + # Block 5-minute statistics for one hour from the last run, or it will overlap + # with existing hourly statistics. Don't block on a database with no existing + # statistics. + if session.query(Statistics.id).count() and ( + last_run_string := session.query(func.max(StatisticsRuns.start)).scalar() + ): + last_run_start_time = process_timestamp(last_run_string) + if last_run_start_time: + fake_start_time = last_run_start_time + timedelta(minutes=5) + while fake_start_time < last_run_start_time + timedelta(hours=1): + session.add(StatisticsRuns(start=fake_start_time)) + fake_start_time += timedelta(minutes=5) + + # Copy last hourly statistic to the newly created 5-minute statistics table + sum_statistics = _get_metadata( + instance.hass, session, None, statistic_type="sum" + ) + for metadata_id in sum_statistics: + last_statistic = ( + session.query(Statistics) + .filter_by(metadata_id=metadata_id) + .order_by(Statistics.start.desc()) + .first() + ) + if last_statistic: + session.add( + StatisticsShortTerm( + metadata_id=last_statistic.metadata_id, + start=last_statistic.start, + last_reset=last_statistic.last_reset, + state=last_statistic.state, + sum=last_statistic.sum, + sum_increase=last_statistic.sum_increase, + ) + ) else: raise ValueError(f"No schema migration defined for version {new_version}") diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index ae7510ee979..42122983007 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -65,7 +65,7 @@ async def test_schema_update_calls(hass): assert await recorder.async_migration_in_progress(hass) is False update.assert_has_calls( [ - call(hass.data[DATA_INSTANCE].engine, ANY, version + 1, 0) + call(hass.data[DATA_INSTANCE], ANY, version + 1, 0) for version in range(0, models.SCHEMA_VERSION) ] )