Fix history queries while the database migration is in progress (#68598)

This commit is contained in:
J. Nick Koston 2022-03-24 09:49:13 -10:00 committed by GitHub
parent e911936a0d
commit a566d3943c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 241 additions and 22 deletions

View File

@ -6,8 +6,9 @@ from datetime import datetime
from itertools import groupby
import logging
import time
from typing import Any
from sqlalchemy import Text, and_, bindparam, func, or_
from sqlalchemy import Column, Text, and_, bindparam, func, or_
from sqlalchemy.ext import baked
from sqlalchemy.sql.expression import literal
@ -59,8 +60,17 @@ QUERY_STATE_NO_ATTR = [
literal(value=None, type_=Text).label("attributes"),
literal(value=None, type_=Text).label("shared_attrs"),
]
# Remove QUERY_STATES_PRE_SCHEMA_25
# and the migration_in_progress check
# once schema 26 is created
QUERY_STATES_PRE_SCHEMA_25 = [
*BASE_STATES,
States.attributes,
literal(value=None, type_=Text).label("shared_attrs"),
]
QUERY_STATES = [
*BASE_STATES,
# Remove States.attributes once all attributes are in StateAttributes.shared_attrs
States.attributes,
StateAttributes.shared_attrs,
]
@ -68,6 +78,51 @@ QUERY_STATES = [
HISTORY_BAKERY = "recorder_history_bakery"
def query_and_join_attributes(
hass: HomeAssistant, no_attributes: bool
) -> tuple[list[Column], bool]:
"""Return the query keys and if StateAttributes should be joined."""
# If no_attributes was requested we do the query
# without the attributes fields and do not join the
# state_attributes table
if no_attributes:
return QUERY_STATE_NO_ATTR, False
# If we in the process of migrating schema we do
# not want to join the state_attributes table as we
# do not know if it will be there yet
if recorder.get_instance(hass).migration_in_progress:
return QUERY_STATES_PRE_SCHEMA_25, False
# Finally if no migration is in progress and no_attributes
# was not requested, we query both attributes columns and
# join state_attributes
return QUERY_STATES, True
def bake_query_and_join_attributes(
hass: HomeAssistant, no_attributes: bool
) -> tuple[Any, bool]:
"""Return the initial backed query and if StateAttributes should be joined.
Because these are baked queries the values inside the lambdas need
to be explicitly written out to avoid caching the wrong values.
"""
bakery: baked.bakery = hass.data[HISTORY_BAKERY]
# If no_attributes was requested we do the query
# without the attributes fields and do not join the
# state_attributes table
if no_attributes:
return bakery(lambda session: session.query(*QUERY_STATE_NO_ATTR)), False
# If we in the process of migrating schema we do
# not want to join the state_attributes table as we
# do not know if it will be there yet
if recorder.get_instance(hass).migration_in_progress:
return bakery(lambda session: session.query(*QUERY_STATES_PRE_SCHEMA_25)), False
# Finally if no migration is in progress and no_attributes
# was not requested, we query both attributes columns and
# join state_attributes
return bakery(lambda session: session.query(*QUERY_STATES)), True
def async_setup(hass):
"""Set up the history hooks."""
hass.data[HISTORY_BAKERY] = baked.bakery()
@ -104,8 +159,7 @@ def get_significant_states_with_session(
thermostat so that we get current temperature in our graphs).
"""
timer_start = time.perf_counter()
query_keys = QUERY_STATE_NO_ATTR if no_attributes else QUERY_STATES
baked_query = hass.data[HISTORY_BAKERY](lambda session: session.query(*query_keys))
baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes)
if entity_ids is not None and len(entity_ids) == 1:
if (
@ -146,7 +200,7 @@ def get_significant_states_with_session(
if end_time is not None:
baked_query += lambda q: q.filter(States.last_updated < bindparam("end_time"))
if not no_attributes:
if join_attributes:
baked_query += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
@ -187,9 +241,8 @@ def state_changes_during_period(
) -> dict[str, list[State]]:
"""Return states changes during UTC period start_time - end_time."""
with session_scope(hass=hass) as session:
query_keys = QUERY_STATE_NO_ATTR if no_attributes else QUERY_STATES
baked_query = hass.data[HISTORY_BAKERY](
lambda session: session.query(*query_keys)
baked_query, join_attributes = bake_query_and_join_attributes(
hass, no_attributes
)
baked_query += lambda q: q.filter(
@ -206,7 +259,7 @@ def state_changes_during_period(
baked_query += lambda q: q.filter_by(entity_id=bindparam("entity_id"))
entity_id = entity_id.lower()
if not no_attributes:
if join_attributes:
baked_query += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
@ -240,18 +293,18 @@ def get_last_state_changes(hass, number_of_states, entity_id):
start_time = dt_util.utcnow()
with session_scope(hass=hass) as session:
baked_query = hass.data[HISTORY_BAKERY](
lambda session: session.query(*QUERY_STATES)
)
baked_query, join_attributes = bake_query_and_join_attributes(hass, False)
baked_query += lambda q: q.filter(States.last_changed == States.last_updated)
if entity_id is not None:
baked_query += lambda q: q.filter_by(entity_id=bindparam("entity_id"))
entity_id = entity_id.lower()
baked_query += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
if join_attributes:
baked_query += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
baked_query += lambda q: q.order_by(
States.entity_id, States.last_updated.desc()
)
@ -322,7 +375,7 @@ def _get_states_with_session(
# We have more than one entity to look at so we need to do a query on states
# since the last recorder run started.
query_keys = QUERY_STATE_NO_ATTR if no_attributes else QUERY_STATES
query_keys, join_attributes = query_and_join_attributes(hass, no_attributes)
query = session.query(*query_keys)
if entity_ids:
@ -344,7 +397,7 @@ def _get_states_with_session(
most_recent_state_ids,
States.state_id == most_recent_state_ids.c.max_state_id,
)
if not no_attributes:
if join_attributes:
query = query.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
@ -386,7 +439,7 @@ def _get_states_with_session(
query = query.filter(~States.entity_id.like(entity_domain))
if filters:
query = filters.apply(query)
if not no_attributes:
if join_attributes:
query = query.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
@ -400,13 +453,12 @@ def _get_single_entity_states_with_session(
):
# Use an entirely different (and extremely fast) query if we only
# have a single entity id
query_keys = QUERY_STATE_NO_ATTR if no_attributes else QUERY_STATES
baked_query = hass.data[HISTORY_BAKERY](lambda session: session.query(*query_keys))
baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes)
baked_query += lambda q: q.filter(
States.last_updated < bindparam("utc_point_in_time"),
States.entity_id == bindparam("entity_id"),
)
if not no_attributes:
if join_attributes:
baked_query += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)

View File

@ -1,22 +1,64 @@
"""The tests the History component."""
# pylint: disable=protected-access,invalid-name
from copy import copy
from datetime import timedelta
from datetime import datetime, timedelta
import json
from unittest.mock import patch, sentinel
import pytest
from homeassistant.components import recorder
from homeassistant.components.recorder import history
from homeassistant.components.recorder.models import process_timestamp
from homeassistant.components.recorder.models import (
Events,
StateAttributes,
States,
process_timestamp,
)
import homeassistant.core as ha
from homeassistant.helpers.json import JSONEncoder
import homeassistant.util.dt as dt_util
from .conftest import SetupRecorderInstanceT
from tests.common import mock_state_change_event
from tests.components.recorder.common import wait_recording_done
def _add_db_entries(
hass: ha.HomeAssistant, point: datetime, entity_ids: list[str]
) -> None:
with recorder.session_scope(hass=hass) as session:
for idx, entity_id in enumerate(entity_ids):
session.add(
Events(
event_id=1001 + idx,
event_type="state_changed",
event_data="{}",
origin="LOCAL",
time_fired=point,
)
)
session.add(
States(
entity_id=entity_id,
state="on",
attributes='{"name":"the light"}',
last_changed=point,
last_updated=point,
event_id=1001 + idx,
attributes_id=1002 + idx,
)
)
session.add(
StateAttributes(
shared_attrs='{"name":"the shared light"}',
hash=1234 + idx,
attributes_id=1002 + idx,
)
)
def _setup_get_states(hass):
"""Set up for testing get_states."""
states = []
@ -501,3 +543,128 @@ def record_states(hass):
)
return zero, four, states
async def test_state_changes_during_period_query_during_migration_to_schema_25(
hass: ha.HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT,
):
"""Test we can query data prior to schema 25 and during migration to schema 25."""
instance = await async_setup_recorder_instance(hass, {})
start = dt_util.utcnow()
point = start + timedelta(seconds=1)
end = point + timedelta(seconds=1)
entity_id = "light.test"
await hass.async_add_executor_job(_add_db_entries, hass, point, [entity_id])
no_attributes = True
hist = history.state_changes_during_period(
hass, start, end, entity_id, no_attributes, include_start_time_state=False
)
state = hist[entity_id][0]
assert state.attributes == {}
no_attributes = False
hist = history.state_changes_during_period(
hass, start, end, entity_id, no_attributes, include_start_time_state=False
)
state = hist[entity_id][0]
assert state.attributes == {"name": "the shared light"}
instance.engine.execute("update states set attributes_id=NULL;")
instance.engine.execute("drop table state_attributes;")
with patch.object(instance, "migration_in_progress", True):
no_attributes = True
hist = history.state_changes_during_period(
hass, start, end, entity_id, no_attributes, include_start_time_state=False
)
state = hist[entity_id][0]
assert state.attributes == {}
no_attributes = False
hist = history.state_changes_during_period(
hass, start, end, entity_id, no_attributes, include_start_time_state=False
)
state = hist[entity_id][0]
assert state.attributes == {"name": "the light"}
async def test_get_states_query_during_migration_to_schema_25(
hass: ha.HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT,
):
"""Test we can query data prior to schema 25 and during migration to schema 25."""
instance = await async_setup_recorder_instance(hass, {})
start = dt_util.utcnow()
point = start + timedelta(seconds=1)
end = point + timedelta(seconds=1)
entity_id = "light.test"
await hass.async_add_executor_job(_add_db_entries, hass, point, [entity_id])
no_attributes = True
hist = history.get_states(hass, end, [entity_id], no_attributes=no_attributes)
state = hist[0]
assert state.attributes == {}
no_attributes = False
hist = history.get_states(hass, end, [entity_id], no_attributes=no_attributes)
state = hist[0]
assert state.attributes == {"name": "the shared light"}
instance.engine.execute("update states set attributes_id=NULL;")
instance.engine.execute("drop table state_attributes;")
with patch.object(instance, "migration_in_progress", True):
no_attributes = True
hist = history.get_states(hass, end, [entity_id], no_attributes=no_attributes)
state = hist[0]
assert state.attributes == {}
no_attributes = False
hist = history.get_states(hass, end, [entity_id], no_attributes=no_attributes)
state = hist[0]
assert state.attributes == {"name": "the light"}
async def test_get_states_query_during_migration_to_schema_25_multiple_entities(
hass: ha.HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT,
):
"""Test we can query data prior to schema 25 and during migration to schema 25."""
instance = await async_setup_recorder_instance(hass, {})
start = dt_util.utcnow()
point = start + timedelta(seconds=1)
end = point + timedelta(seconds=1)
entity_id_1 = "light.test"
entity_id_2 = "switch.test"
entity_ids = [entity_id_1, entity_id_2]
await hass.async_add_executor_job(_add_db_entries, hass, point, entity_ids)
no_attributes = True
hist = history.get_states(hass, end, entity_ids, no_attributes=no_attributes)
assert hist[0].attributes == {}
assert hist[1].attributes == {}
no_attributes = False
hist = history.get_states(hass, end, entity_ids, no_attributes=no_attributes)
assert hist[0].attributes == {"name": "the shared light"}
assert hist[1].attributes == {"name": "the shared light"}
instance.engine.execute("update states set attributes_id=NULL;")
instance.engine.execute("drop table state_attributes;")
with patch.object(instance, "migration_in_progress", True):
no_attributes = True
hist = history.get_states(hass, end, entity_ids, no_attributes=no_attributes)
assert hist[0].attributes == {}
assert hist[1].attributes == {}
no_attributes = False
hist = history.get_states(hass, end, entity_ids, no_attributes=no_attributes)
assert hist[0].attributes == {"name": "the light"}
assert hist[1].attributes == {"name": "the light"}