diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 56f5b53326c..ad160a1ba8f 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -5,12 +5,11 @@ import concurrent.futures from datetime import datetime, timedelta import logging import queue -from sqlite3 import Connection import threading import time from typing import Any, Dict, Optional -from sqlalchemy import create_engine, exc +from sqlalchemy import create_engine, exc, select from sqlalchemy.engine import Engine from sqlalchemy.event import listens_for from sqlalchemy.orm import scoped_session, sessionmaker @@ -61,6 +60,7 @@ DEFAULT_URL = "sqlite:///{hass_config_path}" DEFAULT_DB_FILE = "home-assistant_v2.db" DEFAULT_DB_MAX_RETRIES = 10 DEFAULT_DB_RETRY_WAIT = 3 +KEEPALIVE_TIME = 30 CONF_DB_URL = "db_url" CONF_DB_MAX_RETRIES = "db_max_retries" @@ -223,6 +223,7 @@ class Recorder(threading.Thread): self.exclude_t = exclude.get(CONF_EVENT_TYPES, []) self._timechanges_seen = 0 + self._keepalive_count = 0 self.event_session = None self.get_session = None @@ -353,6 +354,10 @@ class Recorder(threading.Thread): continue if event.event_type == EVENT_TIME_CHANGED: self.queue.task_done() + self._keepalive_count += 1 + if self._keepalive_count >= KEEPALIVE_TIME: + self._keepalive_count = 0 + self._send_keep_alive() if self.commit_interval: self._timechanges_seen += 1 if self._timechanges_seen >= self.commit_interval: @@ -400,6 +405,18 @@ class Recorder(threading.Thread): self.queue.task_done() + def _send_keep_alive(self): + try: + _LOGGER.debug("Sending keepalive") + self.event_session.connection().scalar(select([1])) + return + except Exception as err: # pylint: disable=broad-except + # Must catch the exception to prevent the loop from collapsing + _LOGGER.error( + "Error in database connectivity during keepalive: %s.", err, + ) + self._reopen_event_session() + def _commit_event_session_or_retry(self): tries = 1 while tries <= self.db_max_retries: @@ -419,7 +436,7 @@ class Recorder(threading.Thread): ) else: _LOGGER.error( - "Error in database connectivity: %s. " + "Error in database connectivity during commit: %s. " "(retrying in %s seconds)", err, self.db_retry_wait, @@ -435,6 +452,15 @@ class Recorder(threading.Thread): "Error in database update. Could not save " "after %d tries. Giving up", tries, ) + self._reopen_event_session() + + def _reopen_event_session(self): + try: + self.event_session.rollback() + except Exception as err: # pylint: disable=broad-except + # Must catch the exception to prevent the loop from collapsing + _LOGGER.exception("Error while rolling back event session: %s", err) + try: self.event_session.close() except Exception as err: # pylint: disable=broad-except @@ -470,15 +496,23 @@ class Recorder(threading.Thread): # pylint: disable=unused-variable @listens_for(Engine, "connect") - def set_sqlite_pragma(dbapi_connection, connection_record): - """Set sqlite's WAL mode.""" - if isinstance(dbapi_connection, Connection): + def setup_connection(dbapi_connection, connection_record): + """Dbapi specific connection settings.""" + + # We do not import sqlite3 here so mysql/other + # users do not have to pay for it to be loaded in + # memory + if self.db_url == "sqlite://" or ":memory:" in self.db_url: old_isolation = dbapi_connection.isolation_level dbapi_connection.isolation_level = None cursor = dbapi_connection.cursor() cursor.execute("PRAGMA journal_mode=WAL") cursor.close() dbapi_connection.isolation_level = old_isolation + elif self.db_url.startswith("mysql"): + cursor = dbapi_connection.cursor() + cursor.execute("SET session wait_timeout=28800") + cursor.close() if self.db_url == "sqlite://" or ":memory:" in self.db_url: kwargs["connect_args"] = {"check_same_thread": False}