Avoid storing last_changed in the database if its the same as last_updated (#71843)

This commit is contained in:
J. Nick Koston 2022-05-14 15:06:31 -04:00 committed by GitHub
parent 532b3d780f
commit 8c2743bb67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 189 additions and 67 deletions

View File

@ -270,7 +270,9 @@ def _legacy_select_events_context_id(
NOT_CONTEXT_ONLY,
)
.outerjoin(States, (Events.event_id == States.event_id))
.where(States.last_updated == States.last_changed)
.where(
(States.last_updated == States.last_changed) | States.last_changed.is_(None)
)
.where(_not_continuous_entity_matcher())
.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
@ -302,7 +304,7 @@ def _select_states(start_day: dt, end_day: dt) -> Select:
"event_type"
),
literal(value=None, type_=sqlalchemy.Text).label("event_data"),
States.last_changed.label("time_fired"),
States.last_updated.label("time_fired"),
States.context_id.label("context_id"),
States.context_user_id.label("context_user_id"),
States.context_parent_id.label("context_parent_id"),
@ -314,7 +316,9 @@ def _select_states(start_day: dt, end_day: dt) -> Select:
.outerjoin(old_state, (States.old_state_id == old_state.state_id))
.where(_missing_state_matcher(old_state))
.where(_not_continuous_entity_matcher())
.where(States.last_updated == States.last_changed)
.where(
(States.last_updated == States.last_changed) | States.last_changed.is_(None)
)
.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)

View File

@ -68,19 +68,19 @@ BASE_STATES = [
States.last_changed,
States.last_updated,
]
BASE_STATES_NO_LAST_UPDATED = [
BASE_STATES_NO_LAST_CHANGED = [
States.entity_id,
States.state,
States.last_changed,
literal(value=None, type_=Text).label("last_updated"),
literal(value=None, type_=Text).label("last_changed"),
States.last_updated,
]
QUERY_STATE_NO_ATTR = [
*BASE_STATES,
literal(value=None, type_=Text).label("attributes"),
literal(value=None, type_=Text).label("shared_attrs"),
]
QUERY_STATE_NO_ATTR_NO_LAST_UPDATED = [
*BASE_STATES_NO_LAST_UPDATED,
QUERY_STATE_NO_ATTR_NO_LAST_CHANGED = [
*BASE_STATES_NO_LAST_CHANGED,
literal(value=None, type_=Text).label("attributes"),
literal(value=None, type_=Text).label("shared_attrs"),
]
@ -92,8 +92,8 @@ QUERY_STATES_PRE_SCHEMA_25 = [
States.attributes,
literal(value=None, type_=Text).label("shared_attrs"),
]
QUERY_STATES_PRE_SCHEMA_25_NO_LAST_UPDATED = [
*BASE_STATES_NO_LAST_UPDATED,
QUERY_STATES_PRE_SCHEMA_25_NO_LAST_CHANGED = [
*BASE_STATES_NO_LAST_CHANGED,
States.attributes,
literal(value=None, type_=Text).label("shared_attrs"),
]
@ -103,8 +103,8 @@ QUERY_STATES = [
States.attributes,
StateAttributes.shared_attrs,
]
QUERY_STATES_NO_LAST_UPDATED = [
*BASE_STATES_NO_LAST_UPDATED,
QUERY_STATES_NO_LAST_CHANGED = [
*BASE_STATES_NO_LAST_CHANGED,
# Remove States.attributes once all attributes are in StateAttributes.shared_attrs
States.attributes,
StateAttributes.shared_attrs,
@ -114,7 +114,7 @@ HISTORY_BAKERY = "recorder_history_bakery"
def bake_query_and_join_attributes(
hass: HomeAssistant, no_attributes: bool, include_last_updated: bool = True
hass: HomeAssistant, no_attributes: bool, include_last_changed: bool = True
) -> tuple[Any, bool]:
"""Return the initial backed query and if StateAttributes should be joined.
@ -126,31 +126,31 @@ def bake_query_and_join_attributes(
# without the attributes fields and do not join the
# state_attributes table
if no_attributes:
if include_last_updated:
if include_last_changed:
return bakery(lambda s: s.query(*QUERY_STATE_NO_ATTR)), False
return (
bakery(lambda s: s.query(*QUERY_STATE_NO_ATTR_NO_LAST_UPDATED)),
bakery(lambda s: s.query(*QUERY_STATE_NO_ATTR_NO_LAST_CHANGED)),
False,
)
# If we in the process of migrating schema we do
# not want to join the state_attributes table as we
# do not know if it will be there yet
if recorder.get_instance(hass).schema_version < 25:
if include_last_updated:
if include_last_changed:
return (
bakery(lambda s: s.query(*QUERY_STATES_PRE_SCHEMA_25)),
False,
)
return (
bakery(lambda s: s.query(*QUERY_STATES_PRE_SCHEMA_25_NO_LAST_UPDATED)),
bakery(lambda s: s.query(*QUERY_STATES_PRE_SCHEMA_25_NO_LAST_CHANGED)),
False,
)
# Finally if no migration is in progress and no_attributes
# was not requested, we query both attributes columns and
# join state_attributes
if include_last_updated:
if include_last_changed:
return bakery(lambda s: s.query(*QUERY_STATES)), True
return bakery(lambda s: s.query(*QUERY_STATES_NO_LAST_UPDATED)), True
return bakery(lambda s: s.query(*QUERY_STATES_NO_LAST_CHANGED)), True
def async_setup(hass: HomeAssistant) -> None:
@ -213,7 +213,9 @@ def _query_significant_states_with_session(
if _LOGGER.isEnabledFor(logging.DEBUG):
timer_start = time.perf_counter()
baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes)
baked_query, join_attributes = bake_query_and_join_attributes(
hass, no_attributes, include_last_changed=True
)
if entity_ids is not None and len(entity_ids) == 1:
if (
@ -221,10 +223,11 @@ def _query_significant_states_with_session(
and split_entity_id(entity_ids[0])[0] not in SIGNIFICANT_DOMAINS
):
baked_query, join_attributes = bake_query_and_join_attributes(
hass, no_attributes, include_last_updated=False
hass, no_attributes, include_last_changed=False
)
baked_query += lambda q: q.filter(
States.last_changed == States.last_updated
(States.last_changed == States.last_updated)
| States.last_changed.is_(None)
)
elif significant_changes_only:
baked_query += lambda q: q.filter(
@ -233,7 +236,10 @@ def _query_significant_states_with_session(
States.entity_id.like(entity_domain)
for entity_domain in SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE
],
(States.last_changed == States.last_updated),
(
(States.last_changed == States.last_updated)
| States.last_changed.is_(None)
),
)
)
@ -360,11 +366,14 @@ def state_changes_during_period(
"""Return states changes during UTC period start_time - end_time."""
with session_scope(hass=hass) as session:
baked_query, join_attributes = bake_query_and_join_attributes(
hass, no_attributes, include_last_updated=False
hass, no_attributes, include_last_changed=False
)
baked_query += lambda q: q.filter(
(States.last_changed == States.last_updated)
(
(States.last_changed == States.last_updated)
| States.last_changed.is_(None)
)
& (States.last_updated > bindparam("start_time"))
)
@ -424,10 +433,12 @@ def get_last_state_changes(
with session_scope(hass=hass) as session:
baked_query, join_attributes = bake_query_and_join_attributes(
hass, False, include_last_updated=False
hass, False, include_last_changed=False
)
baked_query += lambda q: q.filter(States.last_changed == States.last_updated)
baked_query += lambda q: q.filter(
(States.last_changed == States.last_updated) | States.last_changed.is_(None)
)
if entity_id is not None:
baked_query += lambda q: q.filter_by(entity_id=bindparam("entity_id"))
@ -489,7 +500,9 @@ def _get_states_baked_query_for_entites(
no_attributes: bool = False,
) -> BakedQuery:
"""Baked query to get states for specific entities."""
baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes)
baked_query, join_attributes = bake_query_and_join_attributes(
hass, no_attributes, include_last_changed=True
)
baked_query += _most_recent_state_ids_entities_subquery
if join_attributes:
baked_query += lambda q: q.outerjoin(
@ -540,7 +553,9 @@ def _get_states_baked_query_for_all(
no_attributes: bool = False,
) -> BakedQuery:
"""Baked query to get states for all entities."""
baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes)
baked_query, join_attributes = bake_query_and_join_attributes(
hass, no_attributes, include_last_changed=True
)
baked_query += _most_recent_state_ids_subquery
baked_query += _ignore_domains_filter
if filters:
@ -599,7 +614,9 @@ def _get_single_entity_states_with_session(
) -> list[Row]:
# Use an entirely different (and extremely fast) query if we only
# have a single entity id
baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes)
baked_query, join_attributes = bake_query_and_join_attributes(
hass, no_attributes, include_last_changed=True
)
baked_query += lambda q: q.filter(
States.last_updated < bindparam("utc_point_in_time"),
States.entity_id == bindparam("entity_id"),
@ -720,7 +737,12 @@ def _sorted_states_to_dict(
ent_results.append(
{
attr_state: state,
attr_last_changed: _process_timestamp(row.last_changed),
#
# minimal_response only makes sense with last_updated == last_updated
#
# We use last_updated for for last_changed since its the same
#
attr_last_changed: _process_timestamp(row.last_updated),
}
)
prev_state = state

View File

@ -250,7 +250,7 @@ class States(Base): # type: ignore[misc,valid-type]
event_id = Column(
Integer, ForeignKey("events.event_id", ondelete="CASCADE"), index=True
)
last_changed = Column(DATETIME_TYPE, default=dt_util.utcnow)
last_changed = Column(DATETIME_TYPE)
last_updated = Column(DATETIME_TYPE, default=dt_util.utcnow, index=True)
old_state_id = Column(Integer, ForeignKey("states.state_id"), index=True)
attributes_id = Column(
@ -291,12 +291,16 @@ class States(Base): # type: ignore[misc,valid-type]
# None state means the state was removed from the state machine
if state is None:
dbstate.state = ""
dbstate.last_changed = event.time_fired
dbstate.last_updated = event.time_fired
dbstate.last_changed = None
return dbstate
dbstate.state = state.state
dbstate.last_updated = state.last_updated
if state.last_updated == state.last_changed:
dbstate.last_changed = None
else:
dbstate.state = state.state
dbstate.last_changed = state.last_changed
dbstate.last_updated = state.last_updated
return dbstate
@ -308,21 +312,27 @@ class States(Base): # type: ignore[misc,valid-type]
parent_id=self.context_parent_id,
)
try:
return State(
self.entity_id,
self.state,
# Join the state_attributes table on attributes_id to get the attributes
# for newer states
json.loads(self.attributes) if self.attributes else {},
process_timestamp(self.last_changed),
process_timestamp(self.last_updated),
context=context,
validate_entity_id=validate_entity_id,
)
attrs = json.loads(self.attributes) if self.attributes else {}
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting row to state: %s", self)
return None
if self.last_changed is None or self.last_changed == self.last_updated:
last_changed = last_updated = process_timestamp(self.last_updated)
else:
last_updated = process_timestamp(self.last_updated)
last_changed = process_timestamp(self.last_changed)
return State(
self.entity_id,
self.state,
# Join the state_attributes table on attributes_id to get the attributes
# for newer states
attrs,
last_changed,
last_updated,
context=context,
validate_entity_id=validate_entity_id,
)
class StateAttributes(Base): # type: ignore[misc,valid-type]
@ -708,7 +718,10 @@ class LazyState(State):
def last_changed(self) -> datetime: # type: ignore[override]
"""Last changed datetime."""
if self._last_changed is None:
self._last_changed = process_timestamp(self._row.last_changed)
if (last_changed := self._row.last_changed) is not None:
self._last_changed = process_timestamp(last_changed)
else:
self._last_changed = self.last_updated
return self._last_changed
@last_changed.setter
@ -720,10 +733,7 @@ class LazyState(State):
def last_updated(self) -> datetime: # type: ignore[override]
"""Last updated datetime."""
if self._last_updated is None:
if (last_updated := self._row.last_updated) is not None:
self._last_updated = process_timestamp(last_updated)
else:
self._last_updated = self.last_changed
self._last_updated = process_timestamp(self._row.last_updated)
return self._last_updated
@last_updated.setter
@ -739,24 +749,24 @@ class LazyState(State):
To be used for JSON serialization.
"""
if self._last_changed is None and self._last_updated is None:
last_changed_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_changed
last_updated_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_updated
)
if (
self._row.last_updated is None
self._row.last_changed is None
or self._row.last_changed == self._row.last_updated
):
last_updated_isoformat = last_changed_isoformat
last_changed_isoformat = last_updated_isoformat
else:
last_updated_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_updated
last_changed_isoformat = process_timestamp_to_utc_isoformat(
self._row.last_changed
)
else:
last_changed_isoformat = self.last_changed.isoformat()
last_updated_isoformat = self.last_updated.isoformat()
if self.last_changed == self.last_updated:
last_updated_isoformat = last_changed_isoformat
last_changed_isoformat = last_updated_isoformat
else:
last_updated_isoformat = self.last_updated.isoformat()
last_changed_isoformat = self.last_changed.isoformat()
return {
"entity_id": self.entity_id,
"state": self.state,
@ -801,13 +811,13 @@ def row_to_compressed_state(
if start_time:
last_changed = last_updated = start_time.timestamp()
else:
row_changed_changed: datetime = row.last_changed
row_last_updated: datetime = row.last_updated
if (
not (row_last_updated := row.last_updated)
not (row_changed_changed := row.last_changed)
or row_last_updated == row_changed_changed
):
last_changed = last_updated = process_datetime_to_timestamp(
row_changed_changed
row_last_updated
)
else:
last_changed = process_datetime_to_timestamp(row_changed_changed)

View File

@ -22,12 +22,15 @@ from homeassistant.components.recorder.models import (
)
from homeassistant.components.recorder.util import session_scope
import homeassistant.core as ha
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, State
from homeassistant.helpers.json import JSONEncoder
import homeassistant.util.dt as dt_util
from tests.common import SetupRecorderInstanceT, mock_state_change_event
from tests.components.recorder.common import wait_recording_done
from tests.components.recorder.common import (
async_wait_recording_done,
wait_recording_done,
)
async def _async_get_states(
@ -79,7 +82,7 @@ def _add_db_entries(
entity_id=entity_id,
state="on",
attributes='{"name":"the light"}',
last_changed=point,
last_changed=None,
last_updated=point,
event_id=1001 + idx,
attributes_id=1002 + idx,
@ -785,3 +788,86 @@ async def test_get_states_query_during_migration_to_schema_25_multiple_entities(
)
assert hist[0].attributes == {"name": "the light"}
assert hist[1].attributes == {"name": "the light"}
async def test_get_full_significant_states_handles_empty_last_changed(
hass: ha.HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT,
):
"""Test getting states when last_changed is null."""
await async_setup_recorder_instance(hass, {})
now = dt_util.utcnow()
hass.states.async_set("sensor.one", "on", {"attr": "original"})
state0 = hass.states.get("sensor.one")
await hass.async_block_till_done()
hass.states.async_set("sensor.one", "on", {"attr": "new"})
state1 = hass.states.get("sensor.one")
assert state0.last_changed == state1.last_changed
assert state0.last_updated != state1.last_updated
await async_wait_recording_done(hass)
def _get_entries():
with session_scope(hass=hass) as session:
return history.get_full_significant_states_with_session(
hass,
session,
now,
dt_util.utcnow(),
entity_ids=["sensor.one"],
significant_changes_only=False,
)
states = await recorder.get_instance(hass).async_add_executor_job(_get_entries)
sensor_one_states: list[State] = states["sensor.one"]
assert sensor_one_states[0] == state0
assert sensor_one_states[1] == state1
assert sensor_one_states[0].last_changed == sensor_one_states[1].last_changed
assert sensor_one_states[0].last_updated != sensor_one_states[1].last_updated
def _fetch_native_states() -> list[State]:
with session_scope(hass=hass) as session:
native_states = []
db_state_attributes = {
state_attributes.attributes_id: state_attributes
for state_attributes in session.query(StateAttributes)
}
for db_state in session.query(States):
state = db_state.to_native()
state.attributes = db_state_attributes[
db_state.attributes_id
].to_native()
native_states.append(state)
return native_states
native_sensor_one_states = await recorder.get_instance(hass).async_add_executor_job(
_fetch_native_states
)
assert native_sensor_one_states[0] == state0
assert native_sensor_one_states[1] == state1
assert (
native_sensor_one_states[0].last_changed
== native_sensor_one_states[1].last_changed
)
assert (
native_sensor_one_states[0].last_updated
!= native_sensor_one_states[1].last_updated
)
def _fetch_db_states() -> list[State]:
with session_scope(hass=hass) as session:
states = list(session.query(States))
session.expunge_all()
return states
db_sensor_one_states = await recorder.get_instance(hass).async_add_executor_job(
_fetch_db_states
)
assert db_sensor_one_states[0].last_changed is None
assert (
process_timestamp(db_sensor_one_states[1].last_changed) == state0.last_changed
)
assert db_sensor_one_states[0].last_updated is not None
assert db_sensor_one_states[1].last_updated is not None
assert db_sensor_one_states[0].last_updated != db_sensor_one_states[1].last_updated

View File

@ -79,7 +79,7 @@ def test_from_event_to_delete_state():
assert db_state.entity_id == "sensor.temperature"
assert db_state.state == ""
assert db_state.last_changed == event.time_fired
assert db_state.last_changed is None
assert db_state.last_updated == event.time_fired