diff --git a/homeassistant/components/recorder/history.py b/homeassistant/components/recorder/history.py index 6f5597ac268..9348e1cbb4b 100644 --- a/homeassistant/components/recorder/history.py +++ b/homeassistant/components/recorder/history.py @@ -6,8 +6,9 @@ from datetime import datetime from itertools import groupby import logging import time +from typing import Any -from sqlalchemy import Text, and_, bindparam, func, or_ +from sqlalchemy import Column, Text, and_, bindparam, func, or_ from sqlalchemy.ext import baked from sqlalchemy.sql.expression import literal @@ -59,8 +60,17 @@ QUERY_STATE_NO_ATTR = [ literal(value=None, type_=Text).label("attributes"), literal(value=None, type_=Text).label("shared_attrs"), ] +# Remove QUERY_STATES_PRE_SCHEMA_25 +# and the migration_in_progress check +# once schema 26 is created +QUERY_STATES_PRE_SCHEMA_25 = [ + *BASE_STATES, + States.attributes, + literal(value=None, type_=Text).label("shared_attrs"), +] QUERY_STATES = [ *BASE_STATES, + # Remove States.attributes once all attributes are in StateAttributes.shared_attrs States.attributes, StateAttributes.shared_attrs, ] @@ -68,6 +78,51 @@ QUERY_STATES = [ HISTORY_BAKERY = "recorder_history_bakery" +def query_and_join_attributes( + hass: HomeAssistant, no_attributes: bool +) -> tuple[list[Column], bool]: + """Return the query keys and if StateAttributes should be joined.""" + # If no_attributes was requested we do the query + # without the attributes fields and do not join the + # state_attributes table + if no_attributes: + return QUERY_STATE_NO_ATTR, False + # If we in the process of migrating schema we do + # not want to join the state_attributes table as we + # do not know if it will be there yet + if recorder.get_instance(hass).migration_in_progress: + return QUERY_STATES_PRE_SCHEMA_25, False + # Finally if no migration is in progress and no_attributes + # was not requested, we query both attributes columns and + # join state_attributes + return QUERY_STATES, True + + +def bake_query_and_join_attributes( + hass: HomeAssistant, no_attributes: bool +) -> tuple[Any, bool]: + """Return the initial backed query and if StateAttributes should be joined. + + Because these are baked queries the values inside the lambdas need + to be explicitly written out to avoid caching the wrong values. + """ + bakery: baked.bakery = hass.data[HISTORY_BAKERY] + # If no_attributes was requested we do the query + # without the attributes fields and do not join the + # state_attributes table + if no_attributes: + return bakery(lambda session: session.query(*QUERY_STATE_NO_ATTR)), False + # If we in the process of migrating schema we do + # not want to join the state_attributes table as we + # do not know if it will be there yet + if recorder.get_instance(hass).migration_in_progress: + return bakery(lambda session: session.query(*QUERY_STATES_PRE_SCHEMA_25)), False + # Finally if no migration is in progress and no_attributes + # was not requested, we query both attributes columns and + # join state_attributes + return bakery(lambda session: session.query(*QUERY_STATES)), True + + def async_setup(hass): """Set up the history hooks.""" hass.data[HISTORY_BAKERY] = baked.bakery() @@ -104,8 +159,7 @@ def get_significant_states_with_session( thermostat so that we get current temperature in our graphs). """ timer_start = time.perf_counter() - query_keys = QUERY_STATE_NO_ATTR if no_attributes else QUERY_STATES - baked_query = hass.data[HISTORY_BAKERY](lambda session: session.query(*query_keys)) + baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes) if entity_ids is not None and len(entity_ids) == 1: if ( @@ -146,7 +200,7 @@ def get_significant_states_with_session( if end_time is not None: baked_query += lambda q: q.filter(States.last_updated < bindparam("end_time")) - if not no_attributes: + if join_attributes: baked_query += lambda q: q.outerjoin( StateAttributes, States.attributes_id == StateAttributes.attributes_id ) @@ -187,9 +241,8 @@ def state_changes_during_period( ) -> dict[str, list[State]]: """Return states changes during UTC period start_time - end_time.""" with session_scope(hass=hass) as session: - query_keys = QUERY_STATE_NO_ATTR if no_attributes else QUERY_STATES - baked_query = hass.data[HISTORY_BAKERY]( - lambda session: session.query(*query_keys) + baked_query, join_attributes = bake_query_and_join_attributes( + hass, no_attributes ) baked_query += lambda q: q.filter( @@ -206,7 +259,7 @@ def state_changes_during_period( baked_query += lambda q: q.filter_by(entity_id=bindparam("entity_id")) entity_id = entity_id.lower() - if not no_attributes: + if join_attributes: baked_query += lambda q: q.outerjoin( StateAttributes, States.attributes_id == StateAttributes.attributes_id ) @@ -240,18 +293,18 @@ def get_last_state_changes(hass, number_of_states, entity_id): start_time = dt_util.utcnow() with session_scope(hass=hass) as session: - baked_query = hass.data[HISTORY_BAKERY]( - lambda session: session.query(*QUERY_STATES) - ) + baked_query, join_attributes = bake_query_and_join_attributes(hass, False) + baked_query += lambda q: q.filter(States.last_changed == States.last_updated) if entity_id is not None: baked_query += lambda q: q.filter_by(entity_id=bindparam("entity_id")) entity_id = entity_id.lower() - baked_query += lambda q: q.outerjoin( - StateAttributes, States.attributes_id == StateAttributes.attributes_id - ) + if join_attributes: + baked_query += lambda q: q.outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ) baked_query += lambda q: q.order_by( States.entity_id, States.last_updated.desc() ) @@ -322,7 +375,7 @@ def _get_states_with_session( # We have more than one entity to look at so we need to do a query on states # since the last recorder run started. - query_keys = QUERY_STATE_NO_ATTR if no_attributes else QUERY_STATES + query_keys, join_attributes = query_and_join_attributes(hass, no_attributes) query = session.query(*query_keys) if entity_ids: @@ -344,7 +397,7 @@ def _get_states_with_session( most_recent_state_ids, States.state_id == most_recent_state_ids.c.max_state_id, ) - if not no_attributes: + if join_attributes: query = query.outerjoin( StateAttributes, (States.attributes_id == StateAttributes.attributes_id) ) @@ -386,7 +439,7 @@ def _get_states_with_session( query = query.filter(~States.entity_id.like(entity_domain)) if filters: query = filters.apply(query) - if not no_attributes: + if join_attributes: query = query.outerjoin( StateAttributes, (States.attributes_id == StateAttributes.attributes_id) ) @@ -400,13 +453,12 @@ def _get_single_entity_states_with_session( ): # Use an entirely different (and extremely fast) query if we only # have a single entity id - query_keys = QUERY_STATE_NO_ATTR if no_attributes else QUERY_STATES - baked_query = hass.data[HISTORY_BAKERY](lambda session: session.query(*query_keys)) + baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes) baked_query += lambda q: q.filter( States.last_updated < bindparam("utc_point_in_time"), States.entity_id == bindparam("entity_id"), ) - if not no_attributes: + if join_attributes: baked_query += lambda q: q.outerjoin( StateAttributes, States.attributes_id == StateAttributes.attributes_id ) diff --git a/tests/components/recorder/test_history.py b/tests/components/recorder/test_history.py index 5d1d72ca650..15f306ae549 100644 --- a/tests/components/recorder/test_history.py +++ b/tests/components/recorder/test_history.py @@ -1,22 +1,64 @@ """The tests the History component.""" # pylint: disable=protected-access,invalid-name from copy import copy -from datetime import timedelta +from datetime import datetime, timedelta import json from unittest.mock import patch, sentinel import pytest +from homeassistant.components import recorder from homeassistant.components.recorder import history -from homeassistant.components.recorder.models import process_timestamp +from homeassistant.components.recorder.models import ( + Events, + StateAttributes, + States, + process_timestamp, +) import homeassistant.core as ha from homeassistant.helpers.json import JSONEncoder import homeassistant.util.dt as dt_util +from .conftest import SetupRecorderInstanceT + from tests.common import mock_state_change_event from tests.components.recorder.common import wait_recording_done +def _add_db_entries( + hass: ha.HomeAssistant, point: datetime, entity_ids: list[str] +) -> None: + with recorder.session_scope(hass=hass) as session: + for idx, entity_id in enumerate(entity_ids): + session.add( + Events( + event_id=1001 + idx, + event_type="state_changed", + event_data="{}", + origin="LOCAL", + time_fired=point, + ) + ) + session.add( + States( + entity_id=entity_id, + state="on", + attributes='{"name":"the light"}', + last_changed=point, + last_updated=point, + event_id=1001 + idx, + attributes_id=1002 + idx, + ) + ) + session.add( + StateAttributes( + shared_attrs='{"name":"the shared light"}', + hash=1234 + idx, + attributes_id=1002 + idx, + ) + ) + + def _setup_get_states(hass): """Set up for testing get_states.""" states = [] @@ -501,3 +543,128 @@ def record_states(hass): ) return zero, four, states + + +async def test_state_changes_during_period_query_during_migration_to_schema_25( + hass: ha.HomeAssistant, + async_setup_recorder_instance: SetupRecorderInstanceT, +): + """Test we can query data prior to schema 25 and during migration to schema 25.""" + instance = await async_setup_recorder_instance(hass, {}) + + start = dt_util.utcnow() + point = start + timedelta(seconds=1) + end = point + timedelta(seconds=1) + entity_id = "light.test" + await hass.async_add_executor_job(_add_db_entries, hass, point, [entity_id]) + + no_attributes = True + hist = history.state_changes_during_period( + hass, start, end, entity_id, no_attributes, include_start_time_state=False + ) + state = hist[entity_id][0] + assert state.attributes == {} + + no_attributes = False + hist = history.state_changes_during_period( + hass, start, end, entity_id, no_attributes, include_start_time_state=False + ) + state = hist[entity_id][0] + assert state.attributes == {"name": "the shared light"} + + instance.engine.execute("update states set attributes_id=NULL;") + instance.engine.execute("drop table state_attributes;") + + with patch.object(instance, "migration_in_progress", True): + no_attributes = True + hist = history.state_changes_during_period( + hass, start, end, entity_id, no_attributes, include_start_time_state=False + ) + state = hist[entity_id][0] + assert state.attributes == {} + + no_attributes = False + hist = history.state_changes_during_period( + hass, start, end, entity_id, no_attributes, include_start_time_state=False + ) + state = hist[entity_id][0] + assert state.attributes == {"name": "the light"} + + +async def test_get_states_query_during_migration_to_schema_25( + hass: ha.HomeAssistant, + async_setup_recorder_instance: SetupRecorderInstanceT, +): + """Test we can query data prior to schema 25 and during migration to schema 25.""" + instance = await async_setup_recorder_instance(hass, {}) + + start = dt_util.utcnow() + point = start + timedelta(seconds=1) + end = point + timedelta(seconds=1) + entity_id = "light.test" + await hass.async_add_executor_job(_add_db_entries, hass, point, [entity_id]) + + no_attributes = True + hist = history.get_states(hass, end, [entity_id], no_attributes=no_attributes) + state = hist[0] + assert state.attributes == {} + + no_attributes = False + hist = history.get_states(hass, end, [entity_id], no_attributes=no_attributes) + state = hist[0] + assert state.attributes == {"name": "the shared light"} + + instance.engine.execute("update states set attributes_id=NULL;") + instance.engine.execute("drop table state_attributes;") + + with patch.object(instance, "migration_in_progress", True): + no_attributes = True + hist = history.get_states(hass, end, [entity_id], no_attributes=no_attributes) + state = hist[0] + assert state.attributes == {} + + no_attributes = False + hist = history.get_states(hass, end, [entity_id], no_attributes=no_attributes) + state = hist[0] + assert state.attributes == {"name": "the light"} + + +async def test_get_states_query_during_migration_to_schema_25_multiple_entities( + hass: ha.HomeAssistant, + async_setup_recorder_instance: SetupRecorderInstanceT, +): + """Test we can query data prior to schema 25 and during migration to schema 25.""" + instance = await async_setup_recorder_instance(hass, {}) + + start = dt_util.utcnow() + point = start + timedelta(seconds=1) + end = point + timedelta(seconds=1) + entity_id_1 = "light.test" + entity_id_2 = "switch.test" + entity_ids = [entity_id_1, entity_id_2] + + await hass.async_add_executor_job(_add_db_entries, hass, point, entity_ids) + + no_attributes = True + hist = history.get_states(hass, end, entity_ids, no_attributes=no_attributes) + assert hist[0].attributes == {} + assert hist[1].attributes == {} + + no_attributes = False + hist = history.get_states(hass, end, entity_ids, no_attributes=no_attributes) + assert hist[0].attributes == {"name": "the shared light"} + assert hist[1].attributes == {"name": "the shared light"} + + instance.engine.execute("update states set attributes_id=NULL;") + instance.engine.execute("drop table state_attributes;") + + with patch.object(instance, "migration_in_progress", True): + no_attributes = True + hist = history.get_states(hass, end, entity_ids, no_attributes=no_attributes) + assert hist[0].attributes == {} + assert hist[1].attributes == {} + + no_attributes = False + hist = history.get_states(hass, end, entity_ids, no_attributes=no_attributes) + assert hist[0].attributes == {"name": "the light"} + assert hist[1].attributes == {"name": "the light"}