Add migration for 5-minute statistics (#56585)

* Add migration for 5-minute statistics

* Tweaks
This commit is contained in:
Erik Montnemery 2021-09-24 09:19:22 +02:00 committed by GitHub
parent 7452998081
commit 64393b462d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 5 deletions

View File

@ -1,9 +1,10 @@
"""Schema migration helpers.""" """Schema migration helpers."""
import contextlib import contextlib
from datetime import timedelta
import logging import logging
import sqlalchemy import sqlalchemy
from sqlalchemy import ForeignKeyConstraint, MetaData, Table, text from sqlalchemy import ForeignKeyConstraint, MetaData, Table, func, text
from sqlalchemy.exc import ( from sqlalchemy.exc import (
InternalError, InternalError,
OperationalError, OperationalError,
@ -21,8 +22,9 @@ from .models import (
StatisticsMeta, StatisticsMeta,
StatisticsRuns, StatisticsRuns,
StatisticsShortTerm, StatisticsShortTerm,
process_timestamp,
) )
from .statistics import get_start_time from .statistics import _get_metadata, get_start_time
from .util import session_scope from .util import session_scope
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -72,7 +74,7 @@ def migrate_schema(instance, current_version):
for version in range(current_version, SCHEMA_VERSION): for version in range(current_version, SCHEMA_VERSION):
new_version = version + 1 new_version = version + 1
_LOGGER.info("Upgrading recorder db schema to version %s", new_version) _LOGGER.info("Upgrading recorder db schema to version %s", new_version)
_apply_update(instance.engine, session, new_version, current_version) _apply_update(instance, session, new_version, current_version)
session.add(SchemaChanges(schema_version=new_version)) session.add(SchemaChanges(schema_version=new_version))
_LOGGER.info("Upgrade to version %s done", new_version) _LOGGER.info("Upgrade to version %s done", new_version)
@ -351,8 +353,9 @@ def _drop_foreign_key_constraints(connection, engine, table, columns):
) )
def _apply_update(engine, session, new_version, old_version): # noqa: C901 def _apply_update(instance, session, new_version, old_version): # noqa: C901
"""Perform operations to bring schema up to date.""" """Perform operations to bring schema up to date."""
engine = instance.engine
connection = session.connection() connection = session.connection()
if new_version == 1: if new_version == 1:
_create_index(connection, "events", "ix_events_time_fired") _create_index(connection, "events", "ix_events_time_fired")
@ -543,6 +546,42 @@ def _apply_update(engine, session, new_version, old_version): # noqa: C901
StatisticsMeta.__table__.create(engine) StatisticsMeta.__table__.create(engine)
StatisticsShortTerm.__table__.create(engine) StatisticsShortTerm.__table__.create(engine)
Statistics.__table__.create(engine) Statistics.__table__.create(engine)
# Block 5-minute statistics for one hour from the last run, or it will overlap
# with existing hourly statistics. Don't block on a database with no existing
# statistics.
if session.query(Statistics.id).count() and (
last_run_string := session.query(func.max(StatisticsRuns.start)).scalar()
):
last_run_start_time = process_timestamp(last_run_string)
if last_run_start_time:
fake_start_time = last_run_start_time + timedelta(minutes=5)
while fake_start_time < last_run_start_time + timedelta(hours=1):
session.add(StatisticsRuns(start=fake_start_time))
fake_start_time += timedelta(minutes=5)
# Copy last hourly statistic to the newly created 5-minute statistics table
sum_statistics = _get_metadata(
instance.hass, session, None, statistic_type="sum"
)
for metadata_id in sum_statistics:
last_statistic = (
session.query(Statistics)
.filter_by(metadata_id=metadata_id)
.order_by(Statistics.start.desc())
.first()
)
if last_statistic:
session.add(
StatisticsShortTerm(
metadata_id=last_statistic.metadata_id,
start=last_statistic.start,
last_reset=last_statistic.last_reset,
state=last_statistic.state,
sum=last_statistic.sum,
sum_increase=last_statistic.sum_increase,
)
)
else: else:
raise ValueError(f"No schema migration defined for version {new_version}") raise ValueError(f"No schema migration defined for version {new_version}")

View File

@ -65,7 +65,7 @@ async def test_schema_update_calls(hass):
assert await recorder.async_migration_in_progress(hass) is False assert await recorder.async_migration_in_progress(hass) is False
update.assert_has_calls( update.assert_has_calls(
[ [
call(hass.data[DATA_INSTANCE].engine, ANY, version + 1, 0) call(hass.data[DATA_INSTANCE], ANY, version + 1, 0)
for version in range(0, models.SCHEMA_VERSION) for version in range(0, models.SCHEMA_VERSION)
] ]
) )