mirror of
https://github.com/home-assistant/core.git
synced 2025-07-15 17:27:10 +00:00
Avoid creating multiple sqlalchemy sessions in a single history call (#35721)
* Avoid a context switch in the history api The history api was creating a job to fetch the states and another job to convert the states to json. This can be done in a single job which decreases the overhead of the operation. * Ensure there is only one sqlalchemy session created per history query. Most queries created three sqlalchemy sessions which was especially slow with sqlite since it opens and closes the database. In testing the UI is noticeably faster at generating history graphs for entites. * Add additional coverage * pass hass first to _states_to_json and _get_significant_states
This commit is contained in:
parent
aeae4edb74
commit
ebed1de581
@ -43,8 +43,15 @@ SIGNIFICANT_DOMAINS = ("climate", "device_tracker", "thermostat", "water_heater"
|
||||
IGNORE_DOMAINS = ("zone", "scene")
|
||||
|
||||
|
||||
def get_significant_states(
|
||||
def get_significant_states(hass, *args, **kwargs):
|
||||
"""Wrap _get_significant_states with a sql session."""
|
||||
with session_scope(hass=hass) as session:
|
||||
return _get_significant_states(hass, session, *args, **kwargs)
|
||||
|
||||
|
||||
def _get_significant_states(
|
||||
hass,
|
||||
session,
|
||||
start_time,
|
||||
end_time=None,
|
||||
entity_ids=None,
|
||||
@ -61,38 +68,43 @@ def get_significant_states(
|
||||
"""
|
||||
timer_start = time.perf_counter()
|
||||
|
||||
with session_scope(hass=hass) as session:
|
||||
if significant_changes_only:
|
||||
query = session.query(States).filter(
|
||||
(
|
||||
States.domain.in_(SIGNIFICANT_DOMAINS)
|
||||
| (States.last_changed == States.last_updated)
|
||||
)
|
||||
& (States.last_updated > start_time)
|
||||
if significant_changes_only:
|
||||
query = session.query(States).filter(
|
||||
(
|
||||
States.domain.in_(SIGNIFICANT_DOMAINS)
|
||||
| (States.last_changed == States.last_updated)
|
||||
)
|
||||
else:
|
||||
query = session.query(States).filter(States.last_updated > start_time)
|
||||
|
||||
if filters:
|
||||
query = filters.apply(query, entity_ids)
|
||||
|
||||
if end_time is not None:
|
||||
query = query.filter(States.last_updated < end_time)
|
||||
|
||||
query = query.order_by(States.last_updated)
|
||||
|
||||
states = (
|
||||
state
|
||||
for state in execute(query)
|
||||
if (_is_significant(state) and not state.attributes.get(ATTR_HIDDEN, False))
|
||||
& (States.last_updated > start_time)
|
||||
)
|
||||
else:
|
||||
query = session.query(States).filter(States.last_updated > start_time)
|
||||
|
||||
if filters:
|
||||
query = filters.apply(query, entity_ids)
|
||||
|
||||
if end_time is not None:
|
||||
query = query.filter(States.last_updated < end_time)
|
||||
|
||||
query = query.order_by(States.last_updated)
|
||||
|
||||
states = (
|
||||
state
|
||||
for state in execute(query)
|
||||
if (_is_significant(state) and not state.attributes.get(ATTR_HIDDEN, False))
|
||||
)
|
||||
|
||||
if _LOGGER.isEnabledFor(logging.DEBUG):
|
||||
elapsed = time.perf_counter() - timer_start
|
||||
_LOGGER.debug("get_significant_states took %fs", elapsed)
|
||||
|
||||
return states_to_json(
|
||||
hass, states, start_time, entity_ids, filters, include_start_time_state
|
||||
return _states_to_json(
|
||||
hass,
|
||||
session,
|
||||
states,
|
||||
start_time,
|
||||
entity_ids,
|
||||
filters,
|
||||
include_start_time_state,
|
||||
)
|
||||
|
||||
|
||||
@ -115,7 +127,7 @@ def state_changes_during_period(hass, start_time, end_time=None, entity_id=None)
|
||||
|
||||
states = execute(query.order_by(States.last_updated))
|
||||
|
||||
return states_to_json(hass, states, start_time, entity_ids)
|
||||
return _states_to_json(hass, session, states, start_time, entity_ids)
|
||||
|
||||
|
||||
def get_last_state_changes(hass, number_of_states, entity_id):
|
||||
@ -135,91 +147,117 @@ def get_last_state_changes(hass, number_of_states, entity_id):
|
||||
query.order_by(States.last_updated.desc()).limit(number_of_states)
|
||||
)
|
||||
|
||||
return states_to_json(
|
||||
hass, reversed(states), start_time, entity_ids, include_start_time_state=False
|
||||
)
|
||||
return _states_to_json(
|
||||
hass,
|
||||
session,
|
||||
reversed(states),
|
||||
start_time,
|
||||
entity_ids,
|
||||
include_start_time_state=False,
|
||||
)
|
||||
|
||||
|
||||
def get_states(hass, utc_point_in_time, entity_ids=None, run=None, filters=None):
|
||||
"""Return the states at a specific point in time."""
|
||||
|
||||
if run is None:
|
||||
run = recorder.run_information(hass, utc_point_in_time)
|
||||
run = recorder.run_information_from_instance(hass, utc_point_in_time)
|
||||
|
||||
# History did not run before utc_point_in_time
|
||||
if run is None:
|
||||
return []
|
||||
|
||||
with session_scope(hass=hass) as session:
|
||||
query = session.query(States)
|
||||
return _get_states_with_session(
|
||||
session, utc_point_in_time, entity_ids, run, filters
|
||||
)
|
||||
|
||||
if entity_ids and len(entity_ids) == 1:
|
||||
# Use an entirely different (and extremely fast) query if we only
|
||||
# have a single entity id
|
||||
query = (
|
||||
query.filter(
|
||||
States.last_updated >= run.start,
|
||||
States.last_updated < utc_point_in_time,
|
||||
States.entity_id.in_(entity_ids),
|
||||
)
|
||||
.order_by(States.last_updated.desc())
|
||||
.limit(1)
|
||||
|
||||
def _get_states_with_session(
|
||||
session, utc_point_in_time, entity_ids=None, run=None, filters=None
|
||||
):
|
||||
"""Return the states at a specific point in time."""
|
||||
if run is None:
|
||||
run = recorder.run_information_with_session(session, utc_point_in_time)
|
||||
|
||||
# History did not run before utc_point_in_time
|
||||
if run is None:
|
||||
return []
|
||||
|
||||
query = session.query(States)
|
||||
|
||||
if entity_ids and len(entity_ids) == 1:
|
||||
# Use an entirely different (and extremely fast) query if we only
|
||||
# have a single entity id
|
||||
query = (
|
||||
query.filter(
|
||||
States.last_updated >= run.start,
|
||||
States.last_updated < utc_point_in_time,
|
||||
States.entity_id.in_(entity_ids),
|
||||
)
|
||||
.order_by(States.last_updated.desc())
|
||||
.limit(1)
|
||||
)
|
||||
|
||||
else:
|
||||
# We have more than one entity to look at (most commonly we want
|
||||
# all entities,) so we need to do a search on all states since the
|
||||
# last recorder run started.
|
||||
else:
|
||||
# We have more than one entity to look at (most commonly we want
|
||||
# all entities,) so we need to do a search on all states since the
|
||||
# last recorder run started.
|
||||
|
||||
most_recent_states_by_date = session.query(
|
||||
States.entity_id.label("max_entity_id"),
|
||||
func.max(States.last_updated).label("max_last_updated"),
|
||||
).filter(
|
||||
(States.last_updated >= run.start)
|
||||
& (States.last_updated < utc_point_in_time)
|
||||
)
|
||||
most_recent_states_by_date = session.query(
|
||||
States.entity_id.label("max_entity_id"),
|
||||
func.max(States.last_updated).label("max_last_updated"),
|
||||
).filter(
|
||||
(States.last_updated >= run.start)
|
||||
& (States.last_updated < utc_point_in_time)
|
||||
)
|
||||
|
||||
if entity_ids:
|
||||
most_recent_states_by_date.filter(States.entity_id.in_(entity_ids))
|
||||
if entity_ids:
|
||||
most_recent_states_by_date.filter(States.entity_id.in_(entity_ids))
|
||||
|
||||
most_recent_states_by_date = most_recent_states_by_date.group_by(
|
||||
States.entity_id
|
||||
)
|
||||
most_recent_states_by_date = most_recent_states_by_date.group_by(
|
||||
States.entity_id
|
||||
)
|
||||
|
||||
most_recent_states_by_date = most_recent_states_by_date.subquery()
|
||||
most_recent_states_by_date = most_recent_states_by_date.subquery()
|
||||
|
||||
most_recent_state_ids = session.query(
|
||||
func.max(States.state_id).label("max_state_id")
|
||||
).join(
|
||||
most_recent_states_by_date,
|
||||
and_(
|
||||
States.entity_id == most_recent_states_by_date.c.max_entity_id,
|
||||
States.last_updated
|
||||
== most_recent_states_by_date.c.max_last_updated,
|
||||
),
|
||||
)
|
||||
most_recent_state_ids = session.query(
|
||||
func.max(States.state_id).label("max_state_id")
|
||||
).join(
|
||||
most_recent_states_by_date,
|
||||
and_(
|
||||
States.entity_id == most_recent_states_by_date.c.max_entity_id,
|
||||
States.last_updated == most_recent_states_by_date.c.max_last_updated,
|
||||
),
|
||||
)
|
||||
|
||||
most_recent_state_ids = most_recent_state_ids.group_by(States.entity_id)
|
||||
most_recent_state_ids = most_recent_state_ids.group_by(States.entity_id)
|
||||
|
||||
most_recent_state_ids = most_recent_state_ids.subquery()
|
||||
most_recent_state_ids = most_recent_state_ids.subquery()
|
||||
|
||||
query = query.join(
|
||||
most_recent_state_ids,
|
||||
States.state_id == most_recent_state_ids.c.max_state_id,
|
||||
).filter(~States.domain.in_(IGNORE_DOMAINS))
|
||||
query = query.join(
|
||||
most_recent_state_ids,
|
||||
States.state_id == most_recent_state_ids.c.max_state_id,
|
||||
).filter(~States.domain.in_(IGNORE_DOMAINS))
|
||||
|
||||
if filters:
|
||||
query = filters.apply(query, entity_ids)
|
||||
if filters:
|
||||
query = filters.apply(query, entity_ids)
|
||||
|
||||
return [
|
||||
state
|
||||
for state in execute(query)
|
||||
if not state.attributes.get(ATTR_HIDDEN, False)
|
||||
]
|
||||
return [
|
||||
state
|
||||
for state in execute(query)
|
||||
if not state.attributes.get(ATTR_HIDDEN, False)
|
||||
]
|
||||
|
||||
|
||||
def states_to_json(
|
||||
hass, states, start_time, entity_ids, filters=None, include_start_time_state=True
|
||||
def _states_to_json(
|
||||
hass,
|
||||
session,
|
||||
states,
|
||||
start_time,
|
||||
entity_ids,
|
||||
filters=None,
|
||||
include_start_time_state=True,
|
||||
):
|
||||
"""Convert SQL results into JSON friendly data structure.
|
||||
|
||||
@ -239,7 +277,10 @@ def states_to_json(
|
||||
# Get the states at the start time
|
||||
timer_start = time.perf_counter()
|
||||
if include_start_time_state:
|
||||
for state in get_states(hass, start_time, entity_ids, filters=filters):
|
||||
run = recorder.run_information_from_instance(hass, start_time)
|
||||
for state in _get_states_with_session(
|
||||
session, start_time, entity_ids, run=run, filters=filters
|
||||
):
|
||||
state.last_changed = start_time
|
||||
state.last_updated = start_time
|
||||
result[state.entity_id].append(state)
|
||||
@ -298,6 +339,7 @@ class HistoryPeriodView(HomeAssistantView):
|
||||
|
||||
async def get(self, request, datetime=None):
|
||||
"""Return history over a period of time."""
|
||||
|
||||
if datetime:
|
||||
datetime = dt_util.parse_datetime(datetime)
|
||||
|
||||
@ -356,15 +398,18 @@ class HistoryPeriodView(HomeAssistantView):
|
||||
"""Fetch significant stats from the database as json."""
|
||||
timer_start = time.perf_counter()
|
||||
|
||||
result = get_significant_states(
|
||||
hass,
|
||||
start_time,
|
||||
end_time,
|
||||
entity_ids,
|
||||
self.filters,
|
||||
include_start_time_state,
|
||||
significant_changes_only,
|
||||
)
|
||||
with session_scope(hass=hass) as session:
|
||||
result = _get_significant_states(
|
||||
hass,
|
||||
session,
|
||||
start_time,
|
||||
end_time,
|
||||
entity_ids,
|
||||
self.filters,
|
||||
include_start_time_state,
|
||||
significant_changes_only,
|
||||
)
|
||||
|
||||
result = list(result.values())
|
||||
if _LOGGER.isEnabledFor(logging.DEBUG):
|
||||
elapsed = time.perf_counter() - timer_start
|
||||
|
@ -123,24 +123,39 @@ def run_information(hass, point_in_time: Optional[datetime] = None):
|
||||
|
||||
There is also the run that covers point_in_time.
|
||||
"""
|
||||
run_info = run_information_from_instance(hass, point_in_time)
|
||||
if run_info:
|
||||
return run_info
|
||||
|
||||
with session_scope(hass=hass) as session:
|
||||
return run_information_with_session(session, point_in_time)
|
||||
|
||||
|
||||
def run_information_from_instance(hass, point_in_time: Optional[datetime] = None):
|
||||
"""Return information about current run from the existing instance.
|
||||
|
||||
Does not query the database for older runs.
|
||||
"""
|
||||
ins = hass.data[DATA_INSTANCE]
|
||||
|
||||
recorder_runs = RecorderRuns
|
||||
if point_in_time is None or point_in_time > ins.recording_start:
|
||||
return ins.run_info
|
||||
|
||||
with session_scope(hass=hass) as session:
|
||||
res = (
|
||||
session.query(recorder_runs)
|
||||
.filter(
|
||||
(recorder_runs.start < point_in_time)
|
||||
& (recorder_runs.end > point_in_time)
|
||||
)
|
||||
.first()
|
||||
|
||||
def run_information_with_session(session, point_in_time: Optional[datetime] = None):
|
||||
"""Return information about current run from the database."""
|
||||
recorder_runs = RecorderRuns
|
||||
|
||||
res = (
|
||||
session.query(recorder_runs)
|
||||
.filter(
|
||||
(recorder_runs.start < point_in_time) & (recorder_runs.end > point_in_time)
|
||||
)
|
||||
if res:
|
||||
session.expunge(res)
|
||||
return res
|
||||
.first()
|
||||
)
|
||||
if res:
|
||||
session.expunge(res)
|
||||
return res
|
||||
|
||||
|
||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
|
@ -103,6 +103,11 @@ class TestComponentHistory(unittest.TestCase):
|
||||
# Test get_state here because we have a DB setup
|
||||
assert states[0] == history.get_state(self.hass, future, states[0].entity_id)
|
||||
|
||||
time_before_recorder_ran = now - timedelta(days=1000)
|
||||
assert history.get_states(self.hass, time_before_recorder_ran) == []
|
||||
|
||||
assert history.get_state(self.hass, time_before_recorder_ran, "demo.id") is None
|
||||
|
||||
def test_state_changes_during_period(self):
|
||||
"""Test state change during period."""
|
||||
self.init_recorder()
|
||||
|
Loading…
x
Reference in New Issue
Block a user