Recorder typing & ensure DB ready on query (#2680)

* Recorder typing & wait on DB ready
This commit is contained in:
Johann Kellerman 2016-07-31 22:56:57 +02:00 committed by GitHub
parent 2871ab6bb0
commit e9bd5d54ad
2 changed files with 49 additions and 25 deletions

View File

@ -11,10 +11,13 @@ import logging
import queue import queue
import threading import threading
import time import time
from datetime import timedelta from datetime import timedelta, datetime
from typing import Any, Union, Optional, List
import voluptuous as vol import voluptuous as vol
from homeassistant.helpers.typing import (ConfigType, QueryType,
HomeAssistantType)
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from homeassistant.const import (EVENT_HOMEASSISTANT_START, from homeassistant.const import (EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED,
@ -44,15 +47,16 @@ CONFIG_SCHEMA = vol.Schema({
}) })
}, extra=vol.ALLOW_EXTRA) }, extra=vol.ALLOW_EXTRA)
_INSTANCE = None _INSTANCE = None # type: Any
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
# These classes will be populated during setup() # These classes will be populated during setup()
# pylint: disable=invalid-name # pylint: disable=invalid-name,no-member
Session = None Session = None # pylint: disable=no-member
def execute(q): def execute(q: QueryType) \
-> List[Any]: # pylint: disable=invalid-sequence-index
"""Query the database and convert the objects to HA native form. """Query the database and convert the objects to HA native form.
This method also retries a few times in the case of stale connections. This method also retries a few times in the case of stale connections.
@ -68,11 +72,11 @@ def execute(q):
except sqlalchemy.exc.SQLAlchemyError as e: except sqlalchemy.exc.SQLAlchemyError as e:
log_error(e, retry_wait=QUERY_RETRY_WAIT, rollback=True) log_error(e, retry_wait=QUERY_RETRY_WAIT, rollback=True)
finally: finally:
Session().close() Session.close()
return [] return []
def run_information(point_in_time=None): def run_information(point_in_time: Optional[datetime]=None):
"""Return information about current run. """Return information about current run.
There is also the run that covers point_in_time. There is also the run that covers point_in_time.
@ -91,7 +95,7 @@ def run_information(point_in_time=None):
(recorder_runs.end > point_in_time)).first() (recorder_runs.end > point_in_time)).first()
def setup(hass, config): def setup(hass: HomeAssistantType, config: ConfigType) -> bool:
"""Setup the recorder.""" """Setup the recorder."""
# pylint: disable=global-statement # pylint: disable=global-statement
global _INSTANCE global _INSTANCE
@ -112,30 +116,36 @@ def setup(hass, config):
return True return True
def query(model_name, *args): def query(model_name: Union[str, Any], *args) -> QueryType:
"""Helper to return a query handle.""" """Helper to return a query handle."""
_verify_instance()
if isinstance(model_name, str): if isinstance(model_name, str):
return Session().query(get_model(model_name), *args) return Session.query(get_model(model_name), *args)
return Session().query(model_name, *args) return Session.query(model_name, *args)
def get_model(model_name): def get_model(model_name: str) -> Any:
"""Get a model class.""" """Get a model class."""
from homeassistant.components.recorder import models from homeassistant.components.recorder import models
try:
return getattr(models, model_name) return getattr(models, model_name)
except AttributeError:
_LOGGER.error("Invalid model name %s", model_name)
return None
def log_error(e, retry_wait=0, rollback=True, def log_error(e: Exception, retry_wait: Optional[float]=0,
message="Error during query: %s"): rollback: Optional[bool]=True,
message: Optional[str]="Error during query: %s") -> None:
"""Log about SQLAlchemy errors in a sane manner.""" """Log about SQLAlchemy errors in a sane manner."""
import sqlalchemy.exc import sqlalchemy.exc
if not isinstance(e, sqlalchemy.exc.OperationalError): if not isinstance(e, sqlalchemy.exc.OperationalError):
_LOGGER.exception(e) _LOGGER.exception(str(e))
else: else:
_LOGGER.error(message, str(e)) _LOGGER.error(message, str(e))
if rollback: if rollback:
Session().rollback() Session.rollback()
if retry_wait: if retry_wait:
_LOGGER.info("Retrying in %s seconds", retry_wait) _LOGGER.info("Retrying in %s seconds", retry_wait)
time.sleep(retry_wait) time.sleep(retry_wait)
@ -145,19 +155,20 @@ class Recorder(threading.Thread):
"""A threaded recorder class.""" """A threaded recorder class."""
# pylint: disable=too-many-instance-attributes # pylint: disable=too-many-instance-attributes
def __init__(self, hass, purge_days, uri): def __init__(self, hass: HomeAssistantType, purge_days: int, uri: str) \
-> None:
"""Initialize the recorder.""" """Initialize the recorder."""
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.hass = hass self.hass = hass
self.purge_days = purge_days self.purge_days = purge_days
self.queue = queue.Queue() self.queue = queue.Queue() # type: Any
self.quit_object = object() self.quit_object = object()
self.recording_start = dt_util.utcnow() self.recording_start = dt_util.utcnow()
self.db_url = uri self.db_url = uri
self.db_ready = threading.Event() self.db_ready = threading.Event()
self.engine = None self.engine = None # type: Any
self._run = None self._run = None # type: Any
def start_recording(event): def start_recording(event):
"""Start recording.""" """Start recording."""
@ -276,7 +287,7 @@ class Recorder(threading.Thread):
run.end = self.recording_start run.end = self.recording_start
_LOGGER.warning("Ended unfinished session (id=%s from %s)", _LOGGER.warning("Ended unfinished session (id=%s from %s)",
run.run_id, run.start) run.run_id, run.start)
Session().add(run) Session.add(run)
_LOGGER.warning("Found unfinished sessions") _LOGGER.warning("Found unfinished sessions")
@ -321,7 +332,7 @@ class Recorder(threading.Thread):
if self._commit(_purge_events): if self._commit(_purge_events):
_LOGGER.info("Purged events created before %s", purge_before) _LOGGER.info("Purged events created before %s", purge_before)
Session().expire_all() Session.expire_all()
# Execute sqlite vacuum command to free up space on disk # Execute sqlite vacuum command to free up space on disk
if self.engine.driver == 'sqlite': if self.engine.driver == 'sqlite':
@ -346,7 +357,8 @@ class Recorder(threading.Thread):
return False return False
def _verify_instance(): def _verify_instance() -> None:
"""Throw error if recorder not initialized.""" """Throw error if recorder not initialized."""
if _INSTANCE is None: if _INSTANCE is None:
raise RuntimeError("Recorder not initialized.") raise RuntimeError("Recorder not initialized.")
_INSTANCE.block_till_db_ready()

View File

@ -0,0 +1,12 @@
"""Typing Helpers for Home-Assistant."""
from typing import NewType, Dict, Any
import homeassistant.core
# pylint: disable=invalid-name
ConfigType = NewType('ConfigType', Dict[str, Any])
HomeAssistantType = homeassistant.core.HomeAssistant
# Custom type for recorder Queries
QueryType = NewType('QueryType', Any)