From bf77c000eaf4b3e8ae6e607481f6a5a75cad9376 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 9 May 2022 23:00:19 -0500 Subject: [PATCH] Complete baked query conversion for recorder.history (#71618) --- homeassistant/components/recorder/history.py | 246 ++++++++++--------- 1 file changed, 131 insertions(+), 115 deletions(-) diff --git a/homeassistant/components/recorder/history.py b/homeassistant/components/recorder/history.py index d221ced3a84..a061bcd1329 100644 --- a/homeassistant/components/recorder/history.py +++ b/homeassistant/components/recorder/history.py @@ -11,6 +11,8 @@ from typing import Any, cast from sqlalchemy import Column, Text, and_, bindparam, func, or_ from sqlalchemy.ext import baked +from sqlalchemy.ext.baked import BakedQuery +from sqlalchemy.orm.query import Query from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import literal @@ -104,26 +106,6 @@ QUERY_STATES_NO_LAST_UPDATED = [ HISTORY_BAKERY = "recorder_history_bakery" -def query_and_join_attributes( - hass: HomeAssistant, no_attributes: bool -) -> tuple[list[Column], bool]: - """Return the query keys and if StateAttributes should be joined.""" - # If no_attributes was requested we do the query - # without the attributes fields and do not join the - # state_attributes table - if no_attributes: - return QUERY_STATE_NO_ATTR, False - # If we in the process of migrating schema we do - # not want to join the state_attributes table as we - # do not know if it will be there yet - if recorder.get_instance(hass).schema_version < 25: - return QUERY_STATES_PRE_SCHEMA_25, False - # Finally if no migration is in progress and no_attributes - # was not requested, we query both attributes columns and - # join state_attributes - return QUERY_STATES, True - - def bake_query_and_join_attributes( hass: HomeAssistant, no_attributes: bool, include_last_updated: bool = True ) -> tuple[Any, bool]: @@ -138,9 +120,9 @@ def bake_query_and_join_attributes( # state_attributes table if no_attributes: if include_last_updated: - return bakery(lambda session: session.query(*QUERY_STATE_NO_ATTR)), False + return bakery(lambda s: s.query(*QUERY_STATE_NO_ATTR)), False return ( - bakery(lambda session: session.query(*QUERY_STATE_NO_ATTR_NO_LAST_UPDATED)), + bakery(lambda s: s.query(*QUERY_STATE_NO_ATTR_NO_LAST_UPDATED)), False, ) # If we in the process of migrating schema we do @@ -149,23 +131,19 @@ def bake_query_and_join_attributes( if recorder.get_instance(hass).schema_version < 25: if include_last_updated: return ( - bakery(lambda session: session.query(*QUERY_STATES_PRE_SCHEMA_25)), + bakery(lambda s: s.query(*QUERY_STATES_PRE_SCHEMA_25)), False, ) return ( - bakery( - lambda session: session.query( - *QUERY_STATES_PRE_SCHEMA_25_NO_LAST_UPDATED - ) - ), + bakery(lambda s: s.query(*QUERY_STATES_PRE_SCHEMA_25_NO_LAST_UPDATED)), False, ) # Finally if no migration is in progress and no_attributes # was not requested, we query both attributes columns and # join state_attributes if include_last_updated: - return bakery(lambda session: session.query(*QUERY_STATES)), True - return bakery(lambda session: session.query(*QUERY_STATES_NO_LAST_UPDATED)), True + return bakery(lambda s: s.query(*QUERY_STATES)), True + return bakery(lambda s: s.query(*QUERY_STATES_NO_LAST_UPDATED)), True def async_setup(hass: HomeAssistant) -> None: @@ -200,6 +178,18 @@ def get_significant_states( ) +def _ignore_domains_filter(query: Query) -> Query: + """Add a filter to ignore domains we do not fetch history for.""" + return query.filter( + and_( + *[ + ~States.entity_id.like(entity_domain) + for entity_domain in IGNORE_DOMAINS_ENTITY_ID_LIKE + ] + ) + ) + + def _query_significant_states_with_session( hass: HomeAssistant, session: Session, @@ -243,14 +233,7 @@ def _query_significant_states_with_session( States.entity_id.in_(bindparam("entity_ids", expanding=True)) ) else: - baked_query += lambda q: q.filter( - and_( - *[ - ~States.entity_id.like(entity_domain) - for entity_domain in IGNORE_DOMAINS_ENTITY_ID_LIKE - ] - ) - ) + baked_query += _ignore_domains_filter if filters: filters.bake(baked_query) @@ -470,6 +453,94 @@ def get_last_state_changes( ) +def _most_recent_state_ids_entities_subquery(query: Query) -> Query: + """Query to find the most recent state id for specific entities.""" + # We got an include-list of entities, accelerate the query by filtering already + # in the inner query. + most_recent_state_ids = ( + query.session.query(func.max(States.state_id).label("max_state_id")) + .filter( + (States.last_updated >= bindparam("run_start")) + & (States.last_updated < bindparam("utc_point_in_time")) + ) + .filter(States.entity_id.in_(bindparam("entity_ids", expanding=True))) + .group_by(States.entity_id) + .subquery() + ) + return query.join( + most_recent_state_ids, + States.state_id == most_recent_state_ids.c.max_state_id, + ) + + +def _get_states_baked_query_for_entites( + hass: HomeAssistant, + no_attributes: bool = False, +) -> BakedQuery: + """Baked query to get states for specific entities.""" + baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes) + baked_query += _most_recent_state_ids_entities_subquery + if join_attributes: + baked_query += lambda q: q.outerjoin( + StateAttributes, (States.attributes_id == StateAttributes.attributes_id) + ) + return baked_query + + +def _most_recent_state_ids_subquery(query: Query) -> Query: + """Find the most recent state ids for all entiites.""" + # We did not get an include-list of entities, query all states in the inner + # query, then filter out unwanted domains as well as applying the custom filter. + # This filtering can't be done in the inner query because the domain column is + # not indexed and we can't control what's in the custom filter. + most_recent_states_by_date = ( + query.session.query( + States.entity_id.label("max_entity_id"), + func.max(States.last_updated).label("max_last_updated"), + ) + .filter( + (States.last_updated >= bindparam("run_start")) + & (States.last_updated < bindparam("utc_point_in_time")) + ) + .group_by(States.entity_id) + .subquery() + ) + most_recent_state_ids = ( + query.session.query(func.max(States.state_id).label("max_state_id")) + .join( + most_recent_states_by_date, + and_( + States.entity_id == most_recent_states_by_date.c.max_entity_id, + States.last_updated == most_recent_states_by_date.c.max_last_updated, + ), + ) + .group_by(States.entity_id) + .subquery() + ) + return query.join( + most_recent_state_ids, + States.state_id == most_recent_state_ids.c.max_state_id, + ) + + +def _get_states_baked_query_for_all( + hass: HomeAssistant, + filters: Any | None = None, + no_attributes: bool = False, +) -> BakedQuery: + """Baked query to get states for all entities.""" + baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes) + baked_query += _most_recent_state_ids_subquery + baked_query += _ignore_domains_filter + if filters: + filters.bake(baked_query) + if join_attributes: + baked_query += lambda q: q.outerjoin( + StateAttributes, (States.attributes_id == StateAttributes.attributes_id) + ) + return baked_query + + def _get_states_with_session( hass: HomeAssistant, session: Session, @@ -494,77 +565,22 @@ def _get_states_with_session( # We have more than one entity to look at so we need to do a query on states # since the last recorder run started. - query_keys, join_attributes = query_and_join_attributes(hass, no_attributes) - query = session.query(*query_keys) - if entity_ids: - # We got an include-list of entities, accelerate the query by filtering already - # in the inner query. - most_recent_state_ids = ( - session.query( - func.max(States.state_id).label("max_state_id"), - ) - .filter( - (States.last_updated >= run.start) - & (States.last_updated < utc_point_in_time) - ) - .filter(States.entity_id.in_(entity_ids)) - ) - most_recent_state_ids = most_recent_state_ids.group_by(States.entity_id) - most_recent_state_ids = most_recent_state_ids.subquery() - query = query.join( - most_recent_state_ids, - States.state_id == most_recent_state_ids.c.max_state_id, - ) - if join_attributes: - query = query.outerjoin( - StateAttributes, (States.attributes_id == StateAttributes.attributes_id) - ) + baked_query = _get_states_baked_query_for_entites(hass, no_attributes) else: - # We did not get an include-list of entities, query all states in the inner - # query, then filter out unwanted domains as well as applying the custom filter. - # This filtering can't be done in the inner query because the domain column is - # not indexed and we can't control what's in the custom filter. - most_recent_states_by_date = ( - session.query( - States.entity_id.label("max_entity_id"), - func.max(States.last_updated).label("max_last_updated"), - ) - .filter( - (States.last_updated >= run.start) - & (States.last_updated < utc_point_in_time) - ) - .group_by(States.entity_id) - .subquery() - ) - most_recent_state_ids = ( - session.query(func.max(States.state_id).label("max_state_id")) - .join( - most_recent_states_by_date, - and_( - States.entity_id == most_recent_states_by_date.c.max_entity_id, - States.last_updated - == most_recent_states_by_date.c.max_last_updated, - ), - ) - .group_by(States.entity_id) - .subquery() - ) - query = query.join( - most_recent_state_ids, - States.state_id == most_recent_state_ids.c.max_state_id, - ) - for entity_domain in IGNORE_DOMAINS_ENTITY_ID_LIKE: - query = query.filter(~States.entity_id.like(entity_domain)) - if filters: - query = filters.apply(query) - if join_attributes: - query = query.outerjoin( - StateAttributes, (States.attributes_id == StateAttributes.attributes_id) - ) + baked_query = _get_states_baked_query_for_all(hass, filters, no_attributes) attr_cache: dict[str, dict[str, Any]] = {} - return [LazyState(row, attr_cache) for row in execute(query)] + return [ + LazyState(row, attr_cache) + for row in execute( + baked_query(session).params( + run_start=run.start, + utc_point_in_time=utc_point_in_time, + entity_ids=entity_ids, + ) + ) + ] def _get_single_entity_states_with_session( @@ -585,8 +601,7 @@ def _get_single_entity_states_with_session( baked_query += lambda q: q.outerjoin( StateAttributes, States.attributes_id == StateAttributes.attributes_id ) - baked_query += lambda q: q.order_by(States.last_updated.desc()) - baked_query += lambda q: q.limit(1) + baked_query += lambda q: q.order_by(States.last_updated.desc()).limit(1) query = baked_query(session).params( utc_point_in_time=utc_point_in_time, entity_id=entity_id @@ -634,8 +649,8 @@ def _sorted_states_to_dict( filters=filters, no_attributes=no_attributes, ): - state.last_changed = start_time state.last_updated = start_time + state.last_changed = start_time result[state.entity_id].append(state) if _LOGGER.isEnabledFor(logging.DEBUG): @@ -671,31 +686,32 @@ def _sorted_states_to_dict( continue ent_results.append(LazyState(first_state, attr_cache)) - prev_state = ent_results[-1] - assert isinstance(prev_state, LazyState) + assert isinstance(ent_results[-1], State) + prev_state: Column | str = ent_results[-1].state initial_state_count = len(ent_results) + db_state = None for db_state in group: # With minimal response we do not care about attribute # changes so we can filter out duplicate states - if db_state.state == prev_state.state: + if (state := db_state.state) == prev_state: continue ent_results.append( { - STATE_KEY: db_state.state, + STATE_KEY: state, LAST_CHANGED_KEY: _process_timestamp_to_utc_isoformat( db_state.last_changed ), } ) - prev_state = db_state + prev_state = state - if prev_state and len(ent_results) != initial_state_count: + if db_state and len(ent_results) != initial_state_count: # There was at least one state change # replace the last minimal state with # a full state - ent_results[-1] = LazyState(prev_state, attr_cache) + ent_results[-1] = LazyState(db_state, attr_cache) # Filter out the empty lists if some states had 0 results. return {key: val for key, val in result.items() if val}