Fix recorder shutdown race and i/o in event loop (#54979)

This commit is contained in:
J. Nick Koston 2021-08-21 14:38:02 -05:00 committed by GitHub
parent ebb8ad308e
commit 8d69475d71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -14,6 +14,7 @@ from typing import Any, Callable, NamedTuple
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, func, select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.orm.session import Session
from sqlalchemy.pool import StaticPool
import voluptuous as vol
@ -568,8 +569,13 @@ class Recorder(threading.Thread):
start = statistics.get_start_time()
self.queue.put(StatisticsTask(start))
@callback
def _async_setup_periodic_tasks(self):
"""Prepare periodic tasks."""
if self.hass.is_stopping or not self.get_session:
# Home Assistant is shutting down
return
# Run nightly tasks at 4:12am
async_track_time_change(
self.hass, self.async_nightly_tasks, hour=4, minute=12, second=0
@ -580,29 +586,6 @@ class Recorder(threading.Thread):
self.hass, self.async_hourly_statistics, minute=12, second=0
)
# Add tasks for missing statistics runs
now = dt_util.utcnow()
last_hour = now.replace(minute=0, second=0, microsecond=0)
start = now - timedelta(days=self.keep_days)
start = start.replace(minute=0, second=0, microsecond=0)
if not self.get_session:
# Home Assistant is shutting down
return
# Find the newest statistics run, if any
with session_scope(session=self.get_session()) as session:
last_run = session.query(func.max(StatisticsRuns.start)).scalar()
if last_run:
start = max(start, process_timestamp(last_run) + timedelta(hours=1))
# Add tasks
while start < last_hour:
end = start + timedelta(hours=1)
_LOGGER.debug("Compiling missing statistics for %s-%s", start, end)
self.queue.put(StatisticsTask(start))
start = start + timedelta(hours=1)
def run(self):
"""Start processing events to save."""
shutdown_task = object()
@ -996,7 +979,7 @@ class Recorder(threading.Thread):
self.get_session = None
def _setup_run(self):
"""Log the start of the current run."""
"""Log the start of the current run and schedule any needed jobs."""
with session_scope(session=self.get_session()) as session:
start = self.recording_start
end_incomplete_runs(session, start)
@ -1004,9 +987,28 @@ class Recorder(threading.Thread):
session.add(self.run_info)
session.flush()
session.expunge(self.run_info)
self._schedule_compile_missing_statistics(session)
self._open_event_session()
def _schedule_compile_missing_statistics(self, session: Session) -> None:
"""Add tasks for missing statistics runs."""
now = dt_util.utcnow()
last_hour = now.replace(minute=0, second=0, microsecond=0)
start = now - timedelta(days=self.keep_days)
start = start.replace(minute=0, second=0, microsecond=0)
# Find the newest statistics run, if any
if last_run := session.query(func.max(StatisticsRuns.start)).scalar():
start = max(start, process_timestamp(last_run) + timedelta(hours=1))
# Add tasks
while start < last_hour:
end = start + timedelta(hours=1)
_LOGGER.debug("Compiling missing statistics for %s-%s", start, end)
self.queue.put(StatisticsTask(start))
start = start + timedelta(hours=1)
def _end_session(self):
"""End the recorder session."""
if self.event_session is None: