diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 897a4eb3c94..17215eb9845 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -14,6 +14,7 @@ from typing import Any, Callable, NamedTuple from sqlalchemy import create_engine, event as sqlalchemy_event, exc, func, select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import scoped_session, sessionmaker +from sqlalchemy.orm.session import Session from sqlalchemy.pool import StaticPool import voluptuous as vol @@ -568,8 +569,13 @@ class Recorder(threading.Thread): start = statistics.get_start_time() self.queue.put(StatisticsTask(start)) + @callback def _async_setup_periodic_tasks(self): """Prepare periodic tasks.""" + if self.hass.is_stopping or not self.get_session: + # Home Assistant is shutting down + return + # Run nightly tasks at 4:12am async_track_time_change( self.hass, self.async_nightly_tasks, hour=4, minute=12, second=0 @@ -580,29 +586,6 @@ class Recorder(threading.Thread): self.hass, self.async_hourly_statistics, minute=12, second=0 ) - # Add tasks for missing statistics runs - now = dt_util.utcnow() - last_hour = now.replace(minute=0, second=0, microsecond=0) - start = now - timedelta(days=self.keep_days) - start = start.replace(minute=0, second=0, microsecond=0) - - if not self.get_session: - # Home Assistant is shutting down - return - - # Find the newest statistics run, if any - with session_scope(session=self.get_session()) as session: - last_run = session.query(func.max(StatisticsRuns.start)).scalar() - if last_run: - start = max(start, process_timestamp(last_run) + timedelta(hours=1)) - - # Add tasks - while start < last_hour: - end = start + timedelta(hours=1) - _LOGGER.debug("Compiling missing statistics for %s-%s", start, end) - self.queue.put(StatisticsTask(start)) - start = start + timedelta(hours=1) - def run(self): """Start processing events to save.""" shutdown_task = object() @@ -996,7 +979,7 @@ class Recorder(threading.Thread): self.get_session = None def _setup_run(self): - """Log the start of the current run.""" + """Log the start of the current run and schedule any needed jobs.""" with session_scope(session=self.get_session()) as session: start = self.recording_start end_incomplete_runs(session, start) @@ -1004,9 +987,28 @@ class Recorder(threading.Thread): session.add(self.run_info) session.flush() session.expunge(self.run_info) + self._schedule_compile_missing_statistics(session) self._open_event_session() + def _schedule_compile_missing_statistics(self, session: Session) -> None: + """Add tasks for missing statistics runs.""" + now = dt_util.utcnow() + last_hour = now.replace(minute=0, second=0, microsecond=0) + start = now - timedelta(days=self.keep_days) + start = start.replace(minute=0, second=0, microsecond=0) + + # Find the newest statistics run, if any + if last_run := session.query(func.max(StatisticsRuns.start)).scalar(): + start = max(start, process_timestamp(last_run) + timedelta(hours=1)) + + # Add tasks + while start < last_hour: + end = start + timedelta(hours=1) + _LOGGER.debug("Compiling missing statistics for %s-%s", start, end) + self.queue.put(StatisticsTask(start)) + start = start + timedelta(hours=1) + def _end_session(self): """End the recorder session.""" if self.event_session is None: