Complete baked query conversion for recorder.history (#71618)

This commit is contained in:
J. Nick Koston 2022-05-09 23:00:19 -05:00 committed by GitHub
parent 3de7ffde54
commit bf77c000ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -11,6 +11,8 @@ from typing import Any, cast
from sqlalchemy import Column, Text, and_, bindparam, func, or_ from sqlalchemy import Column, Text, and_, bindparam, func, or_
from sqlalchemy.ext import baked from sqlalchemy.ext import baked
from sqlalchemy.ext.baked import BakedQuery
from sqlalchemy.orm.query import Query
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import literal from sqlalchemy.sql.expression import literal
@ -104,26 +106,6 @@ QUERY_STATES_NO_LAST_UPDATED = [
HISTORY_BAKERY = "recorder_history_bakery" HISTORY_BAKERY = "recorder_history_bakery"
def query_and_join_attributes(
hass: HomeAssistant, no_attributes: bool
) -> tuple[list[Column], bool]:
"""Return the query keys and if StateAttributes should be joined."""
# If no_attributes was requested we do the query
# without the attributes fields and do not join the
# state_attributes table
if no_attributes:
return QUERY_STATE_NO_ATTR, False
# If we in the process of migrating schema we do
# not want to join the state_attributes table as we
# do not know if it will be there yet
if recorder.get_instance(hass).schema_version < 25:
return QUERY_STATES_PRE_SCHEMA_25, False
# Finally if no migration is in progress and no_attributes
# was not requested, we query both attributes columns and
# join state_attributes
return QUERY_STATES, True
def bake_query_and_join_attributes( def bake_query_and_join_attributes(
hass: HomeAssistant, no_attributes: bool, include_last_updated: bool = True hass: HomeAssistant, no_attributes: bool, include_last_updated: bool = True
) -> tuple[Any, bool]: ) -> tuple[Any, bool]:
@ -138,9 +120,9 @@ def bake_query_and_join_attributes(
# state_attributes table # state_attributes table
if no_attributes: if no_attributes:
if include_last_updated: if include_last_updated:
return bakery(lambda session: session.query(*QUERY_STATE_NO_ATTR)), False return bakery(lambda s: s.query(*QUERY_STATE_NO_ATTR)), False
return ( return (
bakery(lambda session: session.query(*QUERY_STATE_NO_ATTR_NO_LAST_UPDATED)), bakery(lambda s: s.query(*QUERY_STATE_NO_ATTR_NO_LAST_UPDATED)),
False, False,
) )
# If we in the process of migrating schema we do # If we in the process of migrating schema we do
@ -149,23 +131,19 @@ def bake_query_and_join_attributes(
if recorder.get_instance(hass).schema_version < 25: if recorder.get_instance(hass).schema_version < 25:
if include_last_updated: if include_last_updated:
return ( return (
bakery(lambda session: session.query(*QUERY_STATES_PRE_SCHEMA_25)), bakery(lambda s: s.query(*QUERY_STATES_PRE_SCHEMA_25)),
False, False,
) )
return ( return (
bakery( bakery(lambda s: s.query(*QUERY_STATES_PRE_SCHEMA_25_NO_LAST_UPDATED)),
lambda session: session.query(
*QUERY_STATES_PRE_SCHEMA_25_NO_LAST_UPDATED
)
),
False, False,
) )
# Finally if no migration is in progress and no_attributes # Finally if no migration is in progress and no_attributes
# was not requested, we query both attributes columns and # was not requested, we query both attributes columns and
# join state_attributes # join state_attributes
if include_last_updated: if include_last_updated:
return bakery(lambda session: session.query(*QUERY_STATES)), True return bakery(lambda s: s.query(*QUERY_STATES)), True
return bakery(lambda session: session.query(*QUERY_STATES_NO_LAST_UPDATED)), True return bakery(lambda s: s.query(*QUERY_STATES_NO_LAST_UPDATED)), True
def async_setup(hass: HomeAssistant) -> None: def async_setup(hass: HomeAssistant) -> None:
@ -200,6 +178,18 @@ def get_significant_states(
) )
def _ignore_domains_filter(query: Query) -> Query:
"""Add a filter to ignore domains we do not fetch history for."""
return query.filter(
and_(
*[
~States.entity_id.like(entity_domain)
for entity_domain in IGNORE_DOMAINS_ENTITY_ID_LIKE
]
)
)
def _query_significant_states_with_session( def _query_significant_states_with_session(
hass: HomeAssistant, hass: HomeAssistant,
session: Session, session: Session,
@ -243,14 +233,7 @@ def _query_significant_states_with_session(
States.entity_id.in_(bindparam("entity_ids", expanding=True)) States.entity_id.in_(bindparam("entity_ids", expanding=True))
) )
else: else:
baked_query += lambda q: q.filter( baked_query += _ignore_domains_filter
and_(
*[
~States.entity_id.like(entity_domain)
for entity_domain in IGNORE_DOMAINS_ENTITY_ID_LIKE
]
)
)
if filters: if filters:
filters.bake(baked_query) filters.bake(baked_query)
@ -470,6 +453,94 @@ def get_last_state_changes(
) )
def _most_recent_state_ids_entities_subquery(query: Query) -> Query:
"""Query to find the most recent state id for specific entities."""
# We got an include-list of entities, accelerate the query by filtering already
# in the inner query.
most_recent_state_ids = (
query.session.query(func.max(States.state_id).label("max_state_id"))
.filter(
(States.last_updated >= bindparam("run_start"))
& (States.last_updated < bindparam("utc_point_in_time"))
)
.filter(States.entity_id.in_(bindparam("entity_ids", expanding=True)))
.group_by(States.entity_id)
.subquery()
)
return query.join(
most_recent_state_ids,
States.state_id == most_recent_state_ids.c.max_state_id,
)
def _get_states_baked_query_for_entites(
hass: HomeAssistant,
no_attributes: bool = False,
) -> BakedQuery:
"""Baked query to get states for specific entities."""
baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes)
baked_query += _most_recent_state_ids_entities_subquery
if join_attributes:
baked_query += lambda q: q.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
return baked_query
def _most_recent_state_ids_subquery(query: Query) -> Query:
"""Find the most recent state ids for all entiites."""
# We did not get an include-list of entities, query all states in the inner
# query, then filter out unwanted domains as well as applying the custom filter.
# This filtering can't be done in the inner query because the domain column is
# not indexed and we can't control what's in the custom filter.
most_recent_states_by_date = (
query.session.query(
States.entity_id.label("max_entity_id"),
func.max(States.last_updated).label("max_last_updated"),
)
.filter(
(States.last_updated >= bindparam("run_start"))
& (States.last_updated < bindparam("utc_point_in_time"))
)
.group_by(States.entity_id)
.subquery()
)
most_recent_state_ids = (
query.session.query(func.max(States.state_id).label("max_state_id"))
.join(
most_recent_states_by_date,
and_(
States.entity_id == most_recent_states_by_date.c.max_entity_id,
States.last_updated == most_recent_states_by_date.c.max_last_updated,
),
)
.group_by(States.entity_id)
.subquery()
)
return query.join(
most_recent_state_ids,
States.state_id == most_recent_state_ids.c.max_state_id,
)
def _get_states_baked_query_for_all(
hass: HomeAssistant,
filters: Any | None = None,
no_attributes: bool = False,
) -> BakedQuery:
"""Baked query to get states for all entities."""
baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes)
baked_query += _most_recent_state_ids_subquery
baked_query += _ignore_domains_filter
if filters:
filters.bake(baked_query)
if join_attributes:
baked_query += lambda q: q.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
return baked_query
def _get_states_with_session( def _get_states_with_session(
hass: HomeAssistant, hass: HomeAssistant,
session: Session, session: Session,
@ -494,77 +565,22 @@ def _get_states_with_session(
# We have more than one entity to look at so we need to do a query on states # We have more than one entity to look at so we need to do a query on states
# since the last recorder run started. # since the last recorder run started.
query_keys, join_attributes = query_and_join_attributes(hass, no_attributes)
query = session.query(*query_keys)
if entity_ids: if entity_ids:
# We got an include-list of entities, accelerate the query by filtering already baked_query = _get_states_baked_query_for_entites(hass, no_attributes)
# in the inner query.
most_recent_state_ids = (
session.query(
func.max(States.state_id).label("max_state_id"),
)
.filter(
(States.last_updated >= run.start)
& (States.last_updated < utc_point_in_time)
)
.filter(States.entity_id.in_(entity_ids))
)
most_recent_state_ids = most_recent_state_ids.group_by(States.entity_id)
most_recent_state_ids = most_recent_state_ids.subquery()
query = query.join(
most_recent_state_ids,
States.state_id == most_recent_state_ids.c.max_state_id,
)
if join_attributes:
query = query.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
else: else:
# We did not get an include-list of entities, query all states in the inner baked_query = _get_states_baked_query_for_all(hass, filters, no_attributes)
# query, then filter out unwanted domains as well as applying the custom filter.
# This filtering can't be done in the inner query because the domain column is
# not indexed and we can't control what's in the custom filter.
most_recent_states_by_date = (
session.query(
States.entity_id.label("max_entity_id"),
func.max(States.last_updated).label("max_last_updated"),
)
.filter(
(States.last_updated >= run.start)
& (States.last_updated < utc_point_in_time)
)
.group_by(States.entity_id)
.subquery()
)
most_recent_state_ids = (
session.query(func.max(States.state_id).label("max_state_id"))
.join(
most_recent_states_by_date,
and_(
States.entity_id == most_recent_states_by_date.c.max_entity_id,
States.last_updated
== most_recent_states_by_date.c.max_last_updated,
),
)
.group_by(States.entity_id)
.subquery()
)
query = query.join(
most_recent_state_ids,
States.state_id == most_recent_state_ids.c.max_state_id,
)
for entity_domain in IGNORE_DOMAINS_ENTITY_ID_LIKE:
query = query.filter(~States.entity_id.like(entity_domain))
if filters:
query = filters.apply(query)
if join_attributes:
query = query.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
attr_cache: dict[str, dict[str, Any]] = {} attr_cache: dict[str, dict[str, Any]] = {}
return [LazyState(row, attr_cache) for row in execute(query)] return [
LazyState(row, attr_cache)
for row in execute(
baked_query(session).params(
run_start=run.start,
utc_point_in_time=utc_point_in_time,
entity_ids=entity_ids,
)
)
]
def _get_single_entity_states_with_session( def _get_single_entity_states_with_session(
@ -585,8 +601,7 @@ def _get_single_entity_states_with_session(
baked_query += lambda q: q.outerjoin( baked_query += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id StateAttributes, States.attributes_id == StateAttributes.attributes_id
) )
baked_query += lambda q: q.order_by(States.last_updated.desc()) baked_query += lambda q: q.order_by(States.last_updated.desc()).limit(1)
baked_query += lambda q: q.limit(1)
query = baked_query(session).params( query = baked_query(session).params(
utc_point_in_time=utc_point_in_time, entity_id=entity_id utc_point_in_time=utc_point_in_time, entity_id=entity_id
@ -634,8 +649,8 @@ def _sorted_states_to_dict(
filters=filters, filters=filters,
no_attributes=no_attributes, no_attributes=no_attributes,
): ):
state.last_changed = start_time
state.last_updated = start_time state.last_updated = start_time
state.last_changed = start_time
result[state.entity_id].append(state) result[state.entity_id].append(state)
if _LOGGER.isEnabledFor(logging.DEBUG): if _LOGGER.isEnabledFor(logging.DEBUG):
@ -671,31 +686,32 @@ def _sorted_states_to_dict(
continue continue
ent_results.append(LazyState(first_state, attr_cache)) ent_results.append(LazyState(first_state, attr_cache))
prev_state = ent_results[-1] assert isinstance(ent_results[-1], State)
assert isinstance(prev_state, LazyState) prev_state: Column | str = ent_results[-1].state
initial_state_count = len(ent_results) initial_state_count = len(ent_results)
db_state = None
for db_state in group: for db_state in group:
# With minimal response we do not care about attribute # With minimal response we do not care about attribute
# changes so we can filter out duplicate states # changes so we can filter out duplicate states
if db_state.state == prev_state.state: if (state := db_state.state) == prev_state:
continue continue
ent_results.append( ent_results.append(
{ {
STATE_KEY: db_state.state, STATE_KEY: state,
LAST_CHANGED_KEY: _process_timestamp_to_utc_isoformat( LAST_CHANGED_KEY: _process_timestamp_to_utc_isoformat(
db_state.last_changed db_state.last_changed
), ),
} }
) )
prev_state = db_state prev_state = state
if prev_state and len(ent_results) != initial_state_count: if db_state and len(ent_results) != initial_state_count:
# There was at least one state change # There was at least one state change
# replace the last minimal state with # replace the last minimal state with
# a full state # a full state
ent_results[-1] = LazyState(prev_state, attr_cache) ent_results[-1] = LazyState(db_state, attr_cache)
# Filter out the empty lists if some states had 0 results. # Filter out the empty lists if some states had 0 results.
return {key: val for key, val in result.items() if val} return {key: val for key, val in result.items() if val}