From ebed1de58167f6e5d89689d5d7a92729826cf1f6 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 19 May 2020 00:52:38 -0500 Subject: [PATCH] Avoid creating multiple sqlalchemy sessions in a single history call (#35721) * Avoid a context switch in the history api The history api was creating a job to fetch the states and another job to convert the states to json. This can be done in a single job which decreases the overhead of the operation. * Ensure there is only one sqlalchemy session created per history query. Most queries created three sqlalchemy sessions which was especially slow with sqlite since it opens and closes the database. In testing the UI is noticeably faster at generating history graphs for entites. * Add additional coverage * pass hass first to _states_to_json and _get_significant_states --- homeassistant/components/history/__init__.py | 235 +++++++++++------- homeassistant/components/recorder/__init__.py | 39 ++- tests/components/history/test_init.py | 5 + 3 files changed, 172 insertions(+), 107 deletions(-) diff --git a/homeassistant/components/history/__init__.py b/homeassistant/components/history/__init__.py index 8d2f9baab81..7538764dcb8 100644 --- a/homeassistant/components/history/__init__.py +++ b/homeassistant/components/history/__init__.py @@ -43,8 +43,15 @@ SIGNIFICANT_DOMAINS = ("climate", "device_tracker", "thermostat", "water_heater" IGNORE_DOMAINS = ("zone", "scene") -def get_significant_states( +def get_significant_states(hass, *args, **kwargs): + """Wrap _get_significant_states with a sql session.""" + with session_scope(hass=hass) as session: + return _get_significant_states(hass, session, *args, **kwargs) + + +def _get_significant_states( hass, + session, start_time, end_time=None, entity_ids=None, @@ -61,38 +68,43 @@ def get_significant_states( """ timer_start = time.perf_counter() - with session_scope(hass=hass) as session: - if significant_changes_only: - query = session.query(States).filter( - ( - States.domain.in_(SIGNIFICANT_DOMAINS) - | (States.last_changed == States.last_updated) - ) - & (States.last_updated > start_time) + if significant_changes_only: + query = session.query(States).filter( + ( + States.domain.in_(SIGNIFICANT_DOMAINS) + | (States.last_changed == States.last_updated) ) - else: - query = session.query(States).filter(States.last_updated > start_time) - - if filters: - query = filters.apply(query, entity_ids) - - if end_time is not None: - query = query.filter(States.last_updated < end_time) - - query = query.order_by(States.last_updated) - - states = ( - state - for state in execute(query) - if (_is_significant(state) and not state.attributes.get(ATTR_HIDDEN, False)) + & (States.last_updated > start_time) ) + else: + query = session.query(States).filter(States.last_updated > start_time) + + if filters: + query = filters.apply(query, entity_ids) + + if end_time is not None: + query = query.filter(States.last_updated < end_time) + + query = query.order_by(States.last_updated) + + states = ( + state + for state in execute(query) + if (_is_significant(state) and not state.attributes.get(ATTR_HIDDEN, False)) + ) if _LOGGER.isEnabledFor(logging.DEBUG): elapsed = time.perf_counter() - timer_start _LOGGER.debug("get_significant_states took %fs", elapsed) - return states_to_json( - hass, states, start_time, entity_ids, filters, include_start_time_state + return _states_to_json( + hass, + session, + states, + start_time, + entity_ids, + filters, + include_start_time_state, ) @@ -115,7 +127,7 @@ def state_changes_during_period(hass, start_time, end_time=None, entity_id=None) states = execute(query.order_by(States.last_updated)) - return states_to_json(hass, states, start_time, entity_ids) + return _states_to_json(hass, session, states, start_time, entity_ids) def get_last_state_changes(hass, number_of_states, entity_id): @@ -135,91 +147,117 @@ def get_last_state_changes(hass, number_of_states, entity_id): query.order_by(States.last_updated.desc()).limit(number_of_states) ) - return states_to_json( - hass, reversed(states), start_time, entity_ids, include_start_time_state=False - ) + return _states_to_json( + hass, + session, + reversed(states), + start_time, + entity_ids, + include_start_time_state=False, + ) def get_states(hass, utc_point_in_time, entity_ids=None, run=None, filters=None): """Return the states at a specific point in time.""" if run is None: - run = recorder.run_information(hass, utc_point_in_time) + run = recorder.run_information_from_instance(hass, utc_point_in_time) # History did not run before utc_point_in_time if run is None: return [] with session_scope(hass=hass) as session: - query = session.query(States) + return _get_states_with_session( + session, utc_point_in_time, entity_ids, run, filters + ) - if entity_ids and len(entity_ids) == 1: - # Use an entirely different (and extremely fast) query if we only - # have a single entity id - query = ( - query.filter( - States.last_updated >= run.start, - States.last_updated < utc_point_in_time, - States.entity_id.in_(entity_ids), - ) - .order_by(States.last_updated.desc()) - .limit(1) + +def _get_states_with_session( + session, utc_point_in_time, entity_ids=None, run=None, filters=None +): + """Return the states at a specific point in time.""" + if run is None: + run = recorder.run_information_with_session(session, utc_point_in_time) + + # History did not run before utc_point_in_time + if run is None: + return [] + + query = session.query(States) + + if entity_ids and len(entity_ids) == 1: + # Use an entirely different (and extremely fast) query if we only + # have a single entity id + query = ( + query.filter( + States.last_updated >= run.start, + States.last_updated < utc_point_in_time, + States.entity_id.in_(entity_ids), ) + .order_by(States.last_updated.desc()) + .limit(1) + ) - else: - # We have more than one entity to look at (most commonly we want - # all entities,) so we need to do a search on all states since the - # last recorder run started. + else: + # We have more than one entity to look at (most commonly we want + # all entities,) so we need to do a search on all states since the + # last recorder run started. - most_recent_states_by_date = session.query( - States.entity_id.label("max_entity_id"), - func.max(States.last_updated).label("max_last_updated"), - ).filter( - (States.last_updated >= run.start) - & (States.last_updated < utc_point_in_time) - ) + most_recent_states_by_date = session.query( + States.entity_id.label("max_entity_id"), + func.max(States.last_updated).label("max_last_updated"), + ).filter( + (States.last_updated >= run.start) + & (States.last_updated < utc_point_in_time) + ) - if entity_ids: - most_recent_states_by_date.filter(States.entity_id.in_(entity_ids)) + if entity_ids: + most_recent_states_by_date.filter(States.entity_id.in_(entity_ids)) - most_recent_states_by_date = most_recent_states_by_date.group_by( - States.entity_id - ) + most_recent_states_by_date = most_recent_states_by_date.group_by( + States.entity_id + ) - most_recent_states_by_date = most_recent_states_by_date.subquery() + most_recent_states_by_date = most_recent_states_by_date.subquery() - most_recent_state_ids = session.query( - func.max(States.state_id).label("max_state_id") - ).join( - most_recent_states_by_date, - and_( - States.entity_id == most_recent_states_by_date.c.max_entity_id, - States.last_updated - == most_recent_states_by_date.c.max_last_updated, - ), - ) + most_recent_state_ids = session.query( + func.max(States.state_id).label("max_state_id") + ).join( + most_recent_states_by_date, + and_( + States.entity_id == most_recent_states_by_date.c.max_entity_id, + States.last_updated == most_recent_states_by_date.c.max_last_updated, + ), + ) - most_recent_state_ids = most_recent_state_ids.group_by(States.entity_id) + most_recent_state_ids = most_recent_state_ids.group_by(States.entity_id) - most_recent_state_ids = most_recent_state_ids.subquery() + most_recent_state_ids = most_recent_state_ids.subquery() - query = query.join( - most_recent_state_ids, - States.state_id == most_recent_state_ids.c.max_state_id, - ).filter(~States.domain.in_(IGNORE_DOMAINS)) + query = query.join( + most_recent_state_ids, + States.state_id == most_recent_state_ids.c.max_state_id, + ).filter(~States.domain.in_(IGNORE_DOMAINS)) - if filters: - query = filters.apply(query, entity_ids) + if filters: + query = filters.apply(query, entity_ids) - return [ - state - for state in execute(query) - if not state.attributes.get(ATTR_HIDDEN, False) - ] + return [ + state + for state in execute(query) + if not state.attributes.get(ATTR_HIDDEN, False) + ] -def states_to_json( - hass, states, start_time, entity_ids, filters=None, include_start_time_state=True +def _states_to_json( + hass, + session, + states, + start_time, + entity_ids, + filters=None, + include_start_time_state=True, ): """Convert SQL results into JSON friendly data structure. @@ -239,7 +277,10 @@ def states_to_json( # Get the states at the start time timer_start = time.perf_counter() if include_start_time_state: - for state in get_states(hass, start_time, entity_ids, filters=filters): + run = recorder.run_information_from_instance(hass, start_time) + for state in _get_states_with_session( + session, start_time, entity_ids, run=run, filters=filters + ): state.last_changed = start_time state.last_updated = start_time result[state.entity_id].append(state) @@ -298,6 +339,7 @@ class HistoryPeriodView(HomeAssistantView): async def get(self, request, datetime=None): """Return history over a period of time.""" + if datetime: datetime = dt_util.parse_datetime(datetime) @@ -356,15 +398,18 @@ class HistoryPeriodView(HomeAssistantView): """Fetch significant stats from the database as json.""" timer_start = time.perf_counter() - result = get_significant_states( - hass, - start_time, - end_time, - entity_ids, - self.filters, - include_start_time_state, - significant_changes_only, - ) + with session_scope(hass=hass) as session: + result = _get_significant_states( + hass, + session, + start_time, + end_time, + entity_ids, + self.filters, + include_start_time_state, + significant_changes_only, + ) + result = list(result.values()) if _LOGGER.isEnabledFor(logging.DEBUG): elapsed = time.perf_counter() - timer_start diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index fcccaa2fb9f..8cceedb3985 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -123,24 +123,39 @@ def run_information(hass, point_in_time: Optional[datetime] = None): There is also the run that covers point_in_time. """ + run_info = run_information_from_instance(hass, point_in_time) + if run_info: + return run_info + + with session_scope(hass=hass) as session: + return run_information_with_session(session, point_in_time) + + +def run_information_from_instance(hass, point_in_time: Optional[datetime] = None): + """Return information about current run from the existing instance. + + Does not query the database for older runs. + """ ins = hass.data[DATA_INSTANCE] - recorder_runs = RecorderRuns if point_in_time is None or point_in_time > ins.recording_start: return ins.run_info - with session_scope(hass=hass) as session: - res = ( - session.query(recorder_runs) - .filter( - (recorder_runs.start < point_in_time) - & (recorder_runs.end > point_in_time) - ) - .first() + +def run_information_with_session(session, point_in_time: Optional[datetime] = None): + """Return information about current run from the database.""" + recorder_runs = RecorderRuns + + res = ( + session.query(recorder_runs) + .filter( + (recorder_runs.start < point_in_time) & (recorder_runs.end > point_in_time) ) - if res: - session.expunge(res) - return res + .first() + ) + if res: + session.expunge(res) + return res async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: diff --git a/tests/components/history/test_init.py b/tests/components/history/test_init.py index 29e43c8428e..16af2c64271 100644 --- a/tests/components/history/test_init.py +++ b/tests/components/history/test_init.py @@ -103,6 +103,11 @@ class TestComponentHistory(unittest.TestCase): # Test get_state here because we have a DB setup assert states[0] == history.get_state(self.hass, future, states[0].entity_id) + time_before_recorder_ran = now - timedelta(days=1000) + assert history.get_states(self.hass, time_before_recorder_ran) == [] + + assert history.get_state(self.hass, time_before_recorder_ran, "demo.id") is None + def test_state_changes_during_period(self): """Test state change during period.""" self.init_recorder()