diff --git a/.coveragerc b/.coveragerc index 7873da5a31a..e3ba208e6bf 100644 --- a/.coveragerc +++ b/.coveragerc @@ -3,6 +3,7 @@ source = homeassistant omit = homeassistant/__main__.py + homeassistant/scripts/*.py # omit pieces of code that rely on external devices being present homeassistant/components/apcupsd.py diff --git a/homeassistant/components/history.py b/homeassistant/components/history.py index c7fdda5fe34..dbd143888f2 100644 --- a/homeassistant/components/history.py +++ b/homeassistant/components/history.py @@ -9,8 +9,8 @@ from collections import defaultdict from datetime import timedelta from itertools import groupby -from homeassistant.components import recorder, script import homeassistant.util.dt as dt_util +from homeassistant.components import recorder, script from homeassistant.components.http import HomeAssistantView DOMAIN = 'history' @@ -27,13 +27,12 @@ def last_5_states(entity_id): """Return the last 5 states for entity_id.""" entity_id = entity_id.lower() - query = """ - SELECT * FROM states WHERE entity_id=? AND - last_changed=last_updated - ORDER BY state_id DESC LIMIT 0, 5 - """ - - return recorder.query_states(query, (entity_id, )) + states = recorder.get_model('States') + return recorder.execute( + recorder.query('States').filter( + (states.entity_id == entity_id) & + (states.last_changed == states.last_updated) + ).order_by(states.state_id.desc()).limit(5)) def get_significant_states(start_time, end_time=None, entity_id=None): @@ -44,48 +43,42 @@ def get_significant_states(start_time, end_time=None, entity_id=None): as well as all states from certain domains (for instance thermostat so that we get current temperature in our graphs). """ - where = """ - (domain IN ({}) OR last_changed=last_updated) - AND domain NOT IN ({}) AND last_updated > ? - """.format(",".join("'%s'" % x for x in SIGNIFICANT_DOMAINS), - ",".join("'%s'" % x for x in IGNORE_DOMAINS)) - - data = [start_time] + states = recorder.get_model('States') + query = recorder.query('States').filter( + (states.domain.in_(SIGNIFICANT_DOMAINS) | + (states.last_changed == states.last_updated)) & + ((~states.domain.in_(IGNORE_DOMAINS)) & + (states.last_updated > start_time))) if end_time is not None: - where += "AND last_updated < ? " - data.append(end_time) + query = query.filter(states.last_updated < end_time) if entity_id is not None: - where += "AND entity_id = ? " - data.append(entity_id.lower()) + query = query.filter_by(entity_id=entity_id.lower()) - query = ("SELECT * FROM states WHERE {} " - "ORDER BY entity_id, last_updated ASC").format(where) - - states = (state for state in recorder.query_states(query, data) - if _is_significant(state)) + states = ( + state for state in recorder.execute( + query.order_by(states.entity_id, states.last_updated)) + if _is_significant(state)) return states_to_json(states, start_time, entity_id) def state_changes_during_period(start_time, end_time=None, entity_id=None): """Return states changes during UTC period start_time - end_time.""" - where = "last_changed=last_updated AND last_changed > ? " - data = [start_time] + states = recorder.get_model('States') + query = recorder.query('States').filter( + (states.last_changed == states.last_updated) & + (states.last_changed > start_time)) if end_time is not None: - where += "AND last_changed < ? " - data.append(end_time) + query = query.filter(states.last_updated < end_time) if entity_id is not None: - where += "AND entity_id = ? " - data.append(entity_id.lower()) + query = query.filter_by(entity_id=entity_id.lower()) - query = ("SELECT * FROM states WHERE {} " - "ORDER BY entity_id, last_changed ASC").format(where) - - states = recorder.query_states(query, data) + states = recorder.execute( + query.order_by(states.entity_id, states.last_updated)) return states_to_json(states, start_time, entity_id) @@ -99,24 +92,27 @@ def get_states(utc_point_in_time, entity_ids=None, run=None): if run is None: return [] - where = run.where_after_start_run + "AND created < ? " - where_data = [utc_point_in_time] + from sqlalchemy import and_, func + + states = recorder.get_model('States') + most_recent_state_ids = recorder.query( + func.max(states.state_id).label('max_state_id') + ).filter( + (states.created >= run.start) & + (states.created < utc_point_in_time) + ) if entity_ids is not None: - where += "AND entity_id IN ({}) ".format( - ",".join(['?'] * len(entity_ids))) - where_data.extend(entity_ids) + most_recent_state_ids = most_recent_state_ids.filter( + states.entity_id.in_(entity_ids)) - query = """ - SELECT * FROM states - INNER JOIN ( - SELECT max(state_id) AS max_state_id - FROM states WHERE {} - GROUP BY entity_id) - WHERE state_id = max_state_id - """.format(where) + most_recent_state_ids = most_recent_state_ids.group_by( + states.entity_id).subquery() - return recorder.query_states(query, where_data) + query = recorder.query('States').join(most_recent_state_ids, and_( + states.state_id == most_recent_state_ids.c.max_state_id)) + + return recorder.execute(query) def states_to_json(states, start_time, entity_id): diff --git a/homeassistant/components/logbook.py b/homeassistant/components/logbook.py index 82747e73093..433dd468c63 100644 --- a/homeassistant/components/logbook.py +++ b/homeassistant/components/logbook.py @@ -11,27 +11,23 @@ from itertools import groupby import voluptuous as vol +import homeassistant.helpers.config_validation as cv import homeassistant.util.dt as dt_util from homeassistant.components import recorder, sun -from homeassistant.const import ( - EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, - STATE_NOT_HOME, STATE_OFF, STATE_ON) +from homeassistant.components.http import HomeAssistantView +from homeassistant.const import (EVENT_HOMEASSISTANT_START, + EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, + STATE_NOT_HOME, STATE_OFF, STATE_ON) from homeassistant.core import DOMAIN as HA_DOMAIN from homeassistant.core import State -from homeassistant.helpers.entity import split_entity_id from homeassistant.helpers import template -import homeassistant.helpers.config_validation as cv -from homeassistant.components.http import HomeAssistantView +from homeassistant.helpers.entity import split_entity_id DOMAIN = "logbook" DEPENDENCIES = ['recorder', 'http'] URL_LOGBOOK = re.compile(r'/api/logbook(?:/(?P\d{4}-\d{1,2}-\d{1,2})|)') -QUERY_EVENTS_BETWEEN = """ - SELECT * FROM events WHERE time_fired > ? AND time_fired < ? -""" - _LOGGER = logging.getLogger(__name__) EVENT_LOGBOOK_ENTRY = 'logbook_entry' @@ -100,9 +96,11 @@ class LogbookView(HomeAssistantView): end_day = start_day + timedelta(days=1) - events = recorder.query_events( - QUERY_EVENTS_BETWEEN, - (dt_util.as_utc(start_day), dt_util.as_utc(end_day))) + events = recorder.get_model('Events') + query = recorder.query('Events').filter( + (events.time_fired > start_day) & + (events.time_fired < end_day)) + events = recorder.execute(query) return self.json(humanify(events)) diff --git a/homeassistant/components/recorder.py b/homeassistant/components/recorder.py deleted file mode 100644 index 0c7454ad694..00000000000 --- a/homeassistant/components/recorder.py +++ /dev/null @@ -1,529 +0,0 @@ -""" -Support for recording details. - -Component that records all events and state changes. Allows other components -to query this database. - -For more details about this component, please refer to the documentation at -https://home-assistant.io/components/recorder/ -""" -import atexit -import json -import logging -import queue -import sqlite3 -import threading -from datetime import date, datetime, timedelta -import voluptuous as vol - -import homeassistant.util.dt as dt_util -from homeassistant.const import ( - EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, - EVENT_TIME_CHANGED, MATCH_ALL) -from homeassistant.core import Event, EventOrigin, State -from homeassistant.remote import JSONEncoder -from homeassistant.helpers.event import track_point_in_utc_time - -DOMAIN = "recorder" - -DB_FILE = 'home-assistant.db' - -RETURN_ROWCOUNT = "rowcount" -RETURN_LASTROWID = "lastrowid" -RETURN_ONE_ROW = "one_row" - -CONF_PURGE_DAYS = "purge_days" -CONFIG_SCHEMA = vol.Schema({ - DOMAIN: vol.Schema({ - vol.Optional(CONF_PURGE_DAYS): vol.All(vol.Coerce(int), - vol.Range(min=1)), - }) -}, extra=vol.ALLOW_EXTRA) - - -_INSTANCE = None -_LOGGER = logging.getLogger(__name__) - - -def query(sql_query, arguments=None): - """Query the database.""" - _verify_instance() - - return _INSTANCE.query(sql_query, arguments) - - -def query_states(state_query, arguments=None): - """Query the database and return a list of states.""" - return [ - row for row in - (row_to_state(row) for row in query(state_query, arguments)) - if row is not None] - - -def query_events(event_query, arguments=None): - """Query the database and return a list of states.""" - return [ - row for row in - (row_to_event(row) for row in query(event_query, arguments)) - if row is not None] - - -def row_to_state(row): - """Convert a database row to a state.""" - try: - return State( - row[1], row[2], json.loads(row[3]), - dt_util.utc_from_timestamp(row[4]), - dt_util.utc_from_timestamp(row[5])) - except ValueError: - # When json.loads fails - _LOGGER.exception("Error converting row to state: %s", row) - return None - - -def row_to_event(row): - """Convert a databse row to an event.""" - try: - return Event(row[1], json.loads(row[2]), EventOrigin(row[3]), - dt_util.utc_from_timestamp(row[5])) - except ValueError: - # When json.loads fails - _LOGGER.exception("Error converting row to event: %s", row) - return None - - -def run_information(point_in_time=None): - """Return information about current run. - - There is also the run that covers point_in_time. - """ - _verify_instance() - - if point_in_time is None or point_in_time > _INSTANCE.recording_start: - return RecorderRun() - - run = _INSTANCE.query( - "SELECT * FROM recorder_runs WHERE start?", - (point_in_time, point_in_time), return_value=RETURN_ONE_ROW) - - return RecorderRun(run) if run else None - - -def setup(hass, config): - """Setup the recorder.""" - # pylint: disable=global-statement - global _INSTANCE - purge_days = config.get(DOMAIN, {}).get(CONF_PURGE_DAYS) - _INSTANCE = Recorder(hass, purge_days=purge_days) - - return True - - -class RecorderRun(object): - """Representation of a recorder run.""" - - def __init__(self, row=None): - """Initialize the recorder run.""" - self.end = None - - if row is None: - self.start = _INSTANCE.recording_start - self.closed_incorrect = False - else: - self.start = dt_util.utc_from_timestamp(row[1]) - - if row[2] is not None: - self.end = dt_util.utc_from_timestamp(row[2]) - - self.closed_incorrect = bool(row[3]) - - def entity_ids(self, point_in_time=None): - """Return the entity ids that existed in this run. - - Specify point_in_time if you want to know which existed at that point - in time inside the run. - """ - where = self.where_after_start_run - where_data = [] - - if point_in_time is not None or self.end is not None: - where += "AND created < ? " - where_data.append(point_in_time or self.end) - - return [row[0] for row in query( - "SELECT entity_id FROM states WHERE {}" - "GROUP BY entity_id".format(where), where_data)] - - @property - def where_after_start_run(self): - """Return SQL WHERE clause. - - Selection of the rows created after the start of the run. - """ - return "created >= {} ".format(_adapt_datetime(self.start)) - - @property - def where_limit_to_run(self): - """Return a SQL WHERE clause. - - For limiting the results to this run. - """ - where = self.where_after_start_run - - if self.end is not None: - where += "AND created < {} ".format(_adapt_datetime(self.end)) - - return where - - -class Recorder(threading.Thread): - """A threaded recorder class.""" - - # pylint: disable=too-many-instance-attributes - def __init__(self, hass, purge_days): - """Initialize the recorder.""" - threading.Thread.__init__(self) - - self.hass = hass - self.purge_days = purge_days - self.conn = None - self.queue = queue.Queue() - self.quit_object = object() - self.lock = threading.Lock() - self.recording_start = dt_util.utcnow() - self.utc_offset = dt_util.now().utcoffset().total_seconds() - self.db_path = self.hass.config.path(DB_FILE) - - def start_recording(event): - """Start recording.""" - self.start() - - hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_recording) - hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdown) - hass.bus.listen(MATCH_ALL, self.event_listener) - - def run(self): - """Start processing events to save.""" - self._setup_connection() - self._setup_run() - if self.purge_days is not None: - track_point_in_utc_time(self.hass, - lambda now: self._purge_old_data(), - dt_util.utcnow() + timedelta(minutes=5)) - - while True: - event = self.queue.get() - - if event == self.quit_object: - self._close_run() - self._close_connection() - self.queue.task_done() - return - - elif event.event_type == EVENT_TIME_CHANGED: - self.queue.task_done() - continue - - event_id = self.record_event(event) - - if event.event_type == EVENT_STATE_CHANGED: - self.record_state( - event.data['entity_id'], event.data.get('new_state'), - event_id) - - self.queue.task_done() - - def event_listener(self, event): - """Listen for new events and put them in the process queue.""" - self.queue.put(event) - - def shutdown(self, event): - """Tell the recorder to shut down.""" - self.queue.put(self.quit_object) - self.block_till_done() - - def record_state(self, entity_id, state, event_id): - """Save a state to the database.""" - now = dt_util.utcnow() - - # State got deleted - if state is None: - state_state = '' - state_domain = '' - state_attr = '{}' - last_changed = last_updated = now - else: - state_domain = state.domain - state_state = state.state - state_attr = json.dumps(dict(state.attributes)) - last_changed = state.last_changed - last_updated = state.last_updated - - info = ( - entity_id, state_domain, state_state, state_attr, - last_changed, last_updated, - now, self.utc_offset, event_id) - - self.query( - """ - INSERT INTO states ( - entity_id, domain, state, attributes, last_changed, last_updated, - created, utc_offset, event_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - info) - - def record_event(self, event): - """Save an event to the database.""" - info = ( - event.event_type, json.dumps(event.data, cls=JSONEncoder), - str(event.origin), dt_util.utcnow(), event.time_fired, - self.utc_offset - ) - - return self.query( - "INSERT INTO events (" - "event_type, event_data, origin, created, time_fired, utc_offset" - ") VALUES (?, ?, ?, ?, ?, ?)", info, RETURN_LASTROWID) - - def query(self, sql_query, data=None, return_value=None): - """Query the database.""" - try: - with self.conn, self.lock: - _LOGGER.debug("Running query %s", sql_query) - - cur = self.conn.cursor() - - if data is not None: - cur.execute(sql_query, data) - else: - cur.execute(sql_query) - - if return_value == RETURN_ROWCOUNT: - return cur.rowcount - elif return_value == RETURN_LASTROWID: - return cur.lastrowid - elif return_value == RETURN_ONE_ROW: - return cur.fetchone() - else: - return cur.fetchall() - - except (sqlite3.IntegrityError, sqlite3.OperationalError, - sqlite3.ProgrammingError): - _LOGGER.exception( - "Error querying the database using: %s", sql_query) - return [] - - def block_till_done(self): - """Block till all events processed.""" - self.queue.join() - - def _setup_connection(self): - """Ensure database is ready to fly.""" - self.conn = sqlite3.connect(self.db_path, check_same_thread=False) - self.conn.row_factory = sqlite3.Row - - # Make sure the database is closed whenever Python exits - # without the STOP event being fired. - atexit.register(self._close_connection) - - # Have datetime objects be saved as integers. - sqlite3.register_adapter(date, _adapt_datetime) - sqlite3.register_adapter(datetime, _adapt_datetime) - - # Validate we are on the correct schema or that we have to migrate. - cur = self.conn.cursor() - - def save_migration(migration_id): - """Save and commit a migration to the database.""" - cur.execute('INSERT INTO schema_version VALUES (?, ?)', - (migration_id, dt_util.utcnow())) - self.conn.commit() - _LOGGER.info("Database migrated to version %d", migration_id) - - try: - cur.execute('SELECT max(migration_id) FROM schema_version;') - migration_id = cur.fetchone()[0] or 0 - - except sqlite3.OperationalError: - # The table does not exist. - cur.execute('CREATE TABLE schema_version (' - 'migration_id integer primary key, performed integer)') - migration_id = 0 - - if migration_id < 1: - cur.execute(""" - CREATE TABLE recorder_runs ( - run_id integer primary key, - start integer, - end integer, - closed_incorrect integer default 0, - created integer) - """) - - cur.execute(""" - CREATE TABLE events ( - event_id integer primary key, - event_type text, - event_data text, - origin text, - created integer) - """) - cur.execute( - 'CREATE INDEX events__event_type ON events(event_type)') - - cur.execute(""" - CREATE TABLE states ( - state_id integer primary key, - entity_id text, - state text, - attributes text, - last_changed integer, - last_updated integer, - created integer) - """) - cur.execute('CREATE INDEX states__entity_id ON states(entity_id)') - - save_migration(1) - - if migration_id < 2: - cur.execute(""" - ALTER TABLE events - ADD COLUMN time_fired integer - """) - - cur.execute('UPDATE events SET time_fired=created') - - save_migration(2) - - if migration_id < 3: - utc_offset = self.utc_offset - - cur.execute(""" - ALTER TABLE recorder_runs - ADD COLUMN utc_offset integer - """) - - cur.execute(""" - ALTER TABLE events - ADD COLUMN utc_offset integer - """) - - cur.execute(""" - ALTER TABLE states - ADD COLUMN utc_offset integer - """) - - cur.execute("UPDATE recorder_runs SET utc_offset=?", [utc_offset]) - cur.execute("UPDATE events SET utc_offset=?", [utc_offset]) - cur.execute("UPDATE states SET utc_offset=?", [utc_offset]) - - save_migration(3) - - if migration_id < 4: - # We had a bug where we did not save utc offset for recorder runs. - cur.execute( - """UPDATE recorder_runs SET utc_offset=? - WHERE utc_offset IS NULL""", [self.utc_offset]) - - cur.execute(""" - ALTER TABLE states - ADD COLUMN event_id integer - """) - - save_migration(4) - - if migration_id < 5: - # Add domain so that thermostat graphs look right. - try: - cur.execute(""" - ALTER TABLE states - ADD COLUMN domain text - """) - except sqlite3.OperationalError: - # We had a bug in this migration for a while on dev. - # Without this, dev-users will have to throw away their db. - pass - - # TravisCI has Python compiled against an old version of SQLite3 - # which misses the instr method. - self.conn.create_function( - "instr", 2, - lambda string, substring: string.find(substring) + 1) - - # Populate domain with defaults. - cur.execute(""" - UPDATE states - set domain=substr(entity_id, 0, instr(entity_id, '.')) - """) - - # Add indexes we are going to use a lot on selects. - cur.execute(""" - CREATE INDEX states__state_changes ON - states (last_changed, last_updated, entity_id)""") - cur.execute(""" - CREATE INDEX states__significant_changes ON - states (domain, last_updated, entity_id)""") - save_migration(5) - - def _close_connection(self): - """Close connection to the database.""" - _LOGGER.info("Closing database") - atexit.unregister(self._close_connection) - self.conn.close() - - def _setup_run(self): - """Log the start of the current run.""" - if self.query("""UPDATE recorder_runs SET end=?, closed_incorrect=1 - WHERE end IS NULL""", (self.recording_start, ), - return_value=RETURN_ROWCOUNT): - - _LOGGER.warning("Found unfinished sessions") - - self.query( - """INSERT INTO recorder_runs (start, created, utc_offset) - VALUES (?, ?, ?)""", - (self.recording_start, dt_util.utcnow(), self.utc_offset)) - - def _close_run(self): - """Save end time for current run.""" - self.query( - "UPDATE recorder_runs SET end=? WHERE start=?", - (dt_util.utcnow(), self.recording_start)) - - def _purge_old_data(self): - """Purge events and states older than purge_days ago.""" - if not self.purge_days or self.purge_days < 1: - _LOGGER.debug("purge_days set to %s, will not purge any old data.", - self.purge_days) - return - - purge_before = dt_util.utcnow() - timedelta(days=self.purge_days) - - _LOGGER.info("Purging events created before %s", purge_before) - deleted_rows = self.query( - sql_query="DELETE FROM events WHERE created < ?;", - data=(int(purge_before.timestamp()),), - return_value=RETURN_ROWCOUNT) - _LOGGER.debug("Deleted %s events", deleted_rows) - - _LOGGER.info("Purging states created before %s", purge_before) - deleted_rows = self.query( - sql_query="DELETE FROM states WHERE created < ?;", - data=(int(purge_before.timestamp()),), - return_value=RETURN_ROWCOUNT) - _LOGGER.debug("Deleted %s states", deleted_rows) - - # Execute sqlite vacuum command to free up space on disk - self.query("VACUUM;") - - -def _adapt_datetime(datetimestamp): - """Turn a datetime into an integer for in the DB.""" - return dt_util.as_utc(datetimestamp).timestamp() - - -def _verify_instance(): - """Throw error if recorder not initialized.""" - if _INSTANCE is None: - raise RuntimeError("Recorder not initialized.") diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py new file mode 100644 index 00000000000..77f70d00000 --- /dev/null +++ b/homeassistant/components/recorder/__init__.py @@ -0,0 +1,315 @@ +""" +Support for recording details. + +Component that records all events and state changes. Allows other components +to query this database. + +For more details about this component, please refer to the documentation at +https://home-assistant.io/components/recorder/ +""" +import logging +import queue +import threading +import time +from datetime import timedelta + +import voluptuous as vol + +import homeassistant.util.dt as dt_util +from homeassistant.const import (EVENT_HOMEASSISTANT_START, + EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, + EVENT_TIME_CHANGED, MATCH_ALL) +from homeassistant.helpers.event import track_point_in_utc_time + +DOMAIN = "recorder" + +REQUIREMENTS = ['sqlalchemy==1.0.13'] + +DEFAULT_URL = "sqlite:///{hass_config_path}" +DEFAULT_DB_FILE = "home-assistant_v2.db" + +CONF_DB_URL = "db_url" +CONF_PURGE_DAYS = "purge_days" + +RETRIES = 3 +CONNECT_RETRY_WAIT = 10 +QUERY_RETRY_WAIT = 0.1 + +CONFIG_SCHEMA = vol.Schema({ + DOMAIN: vol.Schema({ + vol.Optional(CONF_PURGE_DAYS): vol.All(vol.Coerce(int), + vol.Range(min=1)), + vol.Optional(CONF_DB_URL): vol.Url(''), + }) +}, extra=vol.ALLOW_EXTRA) + +_INSTANCE = None +_LOGGER = logging.getLogger(__name__) + +# These classes will be populated during setup() +# pylint: disable=invalid-name +Session = None + + +def execute(q): + """Query the database and convert the objects to HA native form. + + This method also retries a few times in the case of stale connections. + """ + import sqlalchemy.exc + for _ in range(0, RETRIES): + try: + return [ + row for row in + (row.to_native() for row in q) + if row is not None] + except sqlalchemy.exc.SQLAlchemyError as e: + log_error(e, retry_wait=QUERY_RETRY_WAIT, rollback=True) + return [] + + +def run_information(point_in_time=None): + """Return information about current run. + + There is also the run that covers point_in_time. + """ + _verify_instance() + + recorder_runs = get_model('RecorderRuns') + if point_in_time is None or point_in_time > _INSTANCE.recording_start: + return recorder_runs( + end=None, + start=_INSTANCE.recording_start, + closed_incorrect=False) + + return query('RecorderRuns').filter( + (recorder_runs.start < point_in_time) & + (recorder_runs.end > point_in_time)).first() + + +def setup(hass, config): + """Setup the recorder.""" + # pylint: disable=global-statement + # pylint: disable=too-many-locals + global _INSTANCE + purge_days = config.get(DOMAIN, {}).get(CONF_PURGE_DAYS) + + db_url = config.get(DOMAIN, {}).get(CONF_DB_URL, None) + if not db_url: + db_url = DEFAULT_URL.format( + hass_config_path=hass.config.path(DEFAULT_DB_FILE)) + + _INSTANCE = Recorder(hass, purge_days=purge_days, uri=db_url) + + return True + + +def query(model_name, *args): + """Helper to return a query handle.""" + if isinstance(model_name, str): + return Session().query(get_model(model_name), *args) + return Session().query(model_name, *args) + + +def get_model(model_name): + """Get a model class.""" + from homeassistant.components.recorder import models + + return getattr(models, model_name) + + +def log_error(e, retry_wait=0, rollback=True, + message="Error during query: %s"): + """Log about SQLAlchemy errors in a sane manner.""" + import sqlalchemy.exc + if not isinstance(e, sqlalchemy.exc.OperationalError): + _LOGGER.exception(e) + else: + _LOGGER.error(message, str(e)) + if rollback: + Session().rollback() + if retry_wait: + _LOGGER.info("Retrying failed query in %s seconds", QUERY_RETRY_WAIT) + time.sleep(retry_wait) + + +class Recorder(threading.Thread): + """A threaded recorder class.""" + + # pylint: disable=too-many-instance-attributes + def __init__(self, hass, purge_days, uri): + """Initialize the recorder.""" + threading.Thread.__init__(self) + + self.hass = hass + self.purge_days = purge_days + self.queue = queue.Queue() + self.quit_object = object() + self.recording_start = dt_util.utcnow() + self.db_url = uri + self.db_ready = threading.Event() + self.engine = None + self._run = None + + def start_recording(event): + """Start recording.""" + self.start() + + hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_recording) + hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdown) + hass.bus.listen(MATCH_ALL, self.event_listener) + + def run(self): + """Start processing events to save.""" + from homeassistant.components.recorder.models import Events, States + import sqlalchemy.exc + + while True: + try: + self._setup_connection() + self._setup_run() + break + except sqlalchemy.exc.SQLAlchemyError as e: + log_error(e, retry_wait=CONNECT_RETRY_WAIT, rollback=False, + message="Error during connection setup: %s") + + if self.purge_days is not None: + track_point_in_utc_time(self.hass, + lambda now: self._purge_old_data(), + dt_util.utcnow() + timedelta(minutes=5)) + + while True: + event = self.queue.get() + + if event == self.quit_object: + self._close_run() + self.queue.task_done() + return + + elif event.event_type == EVENT_TIME_CHANGED: + self.queue.task_done() + continue + + for _ in range(0, RETRIES): + try: + event_id = Events.record_event(Session, event) + break + except sqlalchemy.exc.OperationalError as e: + log_error(e, retry_wait=QUERY_RETRY_WAIT, rollback=True) + + if event.event_type == EVENT_STATE_CHANGED: + for _ in range(0, RETRIES): + try: + States.record_state( + Session, + event.data['entity_id'], + event.data.get('new_state'), + event_id) + break + except sqlalchemy.exc.OperationalError as e: + log_error(e, retry_wait=QUERY_RETRY_WAIT, + rollback=True) + + self.queue.task_done() + + def event_listener(self, event): + """Listen for new events and put them in the process queue.""" + self.queue.put(event) + + def shutdown(self, event): + """Tell the recorder to shut down.""" + self.queue.put(self.quit_object) + self.block_till_done() + + def block_till_done(self): + """Block till all events processed.""" + self.queue.join() + + def block_till_db_ready(self): + """Block until the database session is ready.""" + self.db_ready.wait() + + def _setup_connection(self): + """Ensure database is ready to fly.""" + # pylint: disable=global-statement + global Session + + import homeassistant.components.recorder.models as models + from sqlalchemy import create_engine + from sqlalchemy.orm import scoped_session + from sqlalchemy.orm import sessionmaker + + if self.db_url == 'sqlite://' or ':memory:' in self.db_url: + from sqlalchemy.pool import StaticPool + self.engine = create_engine( + 'sqlite://', + connect_args={'check_same_thread': False}, + poolclass=StaticPool) + else: + self.engine = create_engine(self.db_url, echo=False) + + models.Base.metadata.create_all(self.engine) + session_factory = sessionmaker(bind=self.engine) + Session = scoped_session(session_factory) + self.db_ready.set() + + def _setup_run(self): + """Log the start of the current run.""" + recorder_runs = get_model('RecorderRuns') + for run in query('RecorderRuns').filter_by(end=None): + run.closed_incorrect = True + run.end = self.recording_start + _LOGGER.warning("Ended unfinished session (id=%s from %s)", + run.run_id, run.start) + Session().add(run) + + _LOGGER.warning("Found unfinished sessions") + + self._run = recorder_runs( + start=self.recording_start, + created=dt_util.utcnow() + ) + Session().add(self._run) + Session().commit() + + def _close_run(self): + """Save end time for current run.""" + self._run.end = dt_util.utcnow() + Session().add(self._run) + Session().commit() + self._run = None + + def _purge_old_data(self): + """Purge events and states older than purge_days ago.""" + from homeassistant.components.recorder.models import Events, States + + if not self.purge_days or self.purge_days < 1: + _LOGGER.debug("purge_days set to %s, will not purge any old data.", + self.purge_days) + return + + purge_before = dt_util.utcnow() - timedelta(days=self.purge_days) + + _LOGGER.info("Purging events created before %s", purge_before) + deleted_rows = Session().query(Events).filter( + (Events.created < purge_before)).delete(synchronize_session=False) + _LOGGER.debug("Deleted %s events", deleted_rows) + + _LOGGER.info("Purging states created before %s", purge_before) + deleted_rows = Session().query(States).filter( + (States.created < purge_before)).delete(synchronize_session=False) + _LOGGER.debug("Deleted %s states", deleted_rows) + + Session().commit() + Session().expire_all() + + # Execute sqlite vacuum command to free up space on disk + if self.engine.driver == 'sqlite': + _LOGGER.info("Vacuuming SQLite to free space") + self.engine.execute("VACUUM") + + +def _verify_instance(): + """Throw error if recorder not initialized.""" + if _INSTANCE is None: + raise RuntimeError("Recorder not initialized.") diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py new file mode 100644 index 00000000000..6f6cc28dbfc --- /dev/null +++ b/homeassistant/components/recorder/models.py @@ -0,0 +1,151 @@ +"""Models for SQLAlchemy.""" + +import json +from datetime import datetime +import logging + +from sqlalchemy import (Boolean, Column, DateTime, ForeignKey, Index, Integer, + String, Text, distinct) +from sqlalchemy.ext.declarative import declarative_base +import homeassistant.util.dt as dt_util +from homeassistant.core import Event, EventOrigin, State +from homeassistant.remote import JSONEncoder + +# SQLAlchemy Schema +# pylint: disable=invalid-name +Base = declarative_base() + +_LOGGER = logging.getLogger(__name__) + + +class Events(Base): + # pylint: disable=too-few-public-methods + """Event history data.""" + + __tablename__ = 'events' + event_id = Column(Integer, primary_key=True) + event_type = Column(String(32), index=True) + event_data = Column(Text) + origin = Column(String(32)) + time_fired = Column(DateTime(timezone=True)) + created = Column(DateTime(timezone=True), default=datetime.utcnow) + + @staticmethod + def record_event(session, event): + """Save an event to the database.""" + dbevent = Events(event_type=event.event_type, + event_data=json.dumps(event.data, cls=JSONEncoder), + origin=str(event.origin), + time_fired=event.time_fired) + + session.add(dbevent) + session.commit() + + return dbevent.event_id + + def to_native(self): + """Convert to a natve HA Event.""" + try: + return Event( + self.event_type, + json.loads(self.event_data), + EventOrigin(self.origin), + dt_util.UTC.localize(self.time_fired) + ) + except ValueError: + # When json.loads fails + _LOGGER.exception("Error converting to event: %s", self) + return None + + +class States(Base): + # pylint: disable=too-few-public-methods + """State change history.""" + + __tablename__ = 'states' + state_id = Column(Integer, primary_key=True) + domain = Column(String(64)) + entity_id = Column(String(64)) + state = Column(String(255)) + attributes = Column(Text) + origin = Column(String(32)) + event_id = Column(Integer, ForeignKey('events.event_id')) + last_changed = Column(DateTime(timezone=True), default=datetime.utcnow) + last_updated = Column(DateTime(timezone=True), default=datetime.utcnow) + created = Column(DateTime(timezone=True), default=datetime.utcnow) + + __table_args__ = (Index('states__state_changes', + 'last_changed', 'last_updated', 'entity_id'), + Index('states__significant_changes', + 'domain', 'last_updated', 'entity_id'), ) + + @staticmethod + def record_state(session, entity_id, state, event_id): + """Save a state to the database.""" + now = dt_util.utcnow() + + dbstate = States(event_id=event_id, entity_id=entity_id) + + # State got deleted + if state is None: + dbstate.state = '' + dbstate.domain = '' + dbstate.attributes = '{}' + dbstate.last_changed = now + dbstate.last_updated = now + else: + dbstate.domain = state.domain + dbstate.state = state.state + dbstate.attributes = json.dumps(dict(state.attributes)) + dbstate.last_changed = state.last_changed + dbstate.last_updated = state.last_updated + + session().add(dbstate) + session().commit() + + def to_native(self): + """Convert to an HA state object.""" + try: + return State( + self.entity_id, self.state, + json.loads(self.attributes), + dt_util.UTC.localize(self.last_changed), + dt_util.UTC.localize(self.last_updated) + ) + except ValueError: + # When json.loads fails + _LOGGER.exception("Error converting row to state: %s", self) + return None + + +class RecorderRuns(Base): + # pylint: disable=too-few-public-methods + """Representation of recorder run.""" + + __tablename__ = 'recorder_runs' + run_id = Column(Integer, primary_key=True) + start = Column(DateTime(timezone=True), default=datetime.utcnow) + end = Column(DateTime(timezone=True)) + closed_incorrect = Column(Boolean, default=False) + created = Column(DateTime(timezone=True), default=datetime.utcnow) + + def entity_ids(self, point_in_time=None): + """Return the entity ids that existed in this run. + + Specify point_in_time if you want to know which existed at that point + in time inside the run. + """ + from homeassistant.components.recorder import Session, _verify_instance + _verify_instance() + + query = Session().query(distinct(States.entity_id)).filter( + States.created >= self.start) + + if point_in_time is not None or self.end is not None: + query = query.filter(States.created < point_in_time) + + return [row.entity_id for row in query] + + def to_native(self): + """Return self, native format is this model.""" + return self diff --git a/homeassistant/scripts/convert_db_to_sqlalchemy.py b/homeassistant/scripts/convert_db_to_sqlalchemy.py new file mode 100755 index 00000000000..85104a7d38f --- /dev/null +++ b/homeassistant/scripts/convert_db_to_sqlalchemy.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python + +"""Script to convert an old-format home-assistant.db to a new format one.""" + +import argparse +import os.path +import sqlite3 +import sys +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from homeassistant.components.recorder import models +import homeassistant.config as config_util +import homeassistant.util.dt as dt_util + + +def ts_to_dt(timestamp): + """Turn a datetime into an integer for in the DB.""" + if timestamp is None: + return None + return dt_util.utc_from_timestamp(timestamp) + + +# Based on code at +# http://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console +# pylint: disable=too-many-arguments +def print_progress(iteration, total, prefix='', suffix='', decimals=2, + bar_length=68): + """Print progress bar. + + Call in a loop to create terminal progress bar + @params: + iteration - Required : current iteration (Int) + total - Required : total iterations (Int) + prefix - Optional : prefix string (Str) + suffix - Optional : suffix string (Str) + decimals - Optional : number of decimals in percent complete (Int) + barLength - Optional : character length of bar (Int) + """ + filled_length = int(round(bar_length * iteration / float(total))) + percents = round(100.00 * (iteration / float(total)), decimals) + line = '#' * filled_length + '-' * (bar_length - filled_length) + sys.stdout.write('%s [%s] %s%s %s\r' % (prefix, line, + percents, '%', suffix)) + sys.stdout.flush() + if iteration == total: + print("\n") + + +def main(): + """The actual script body.""" + # pylint: disable=too-many-locals,invalid-name,too-many-statements + parser = argparse.ArgumentParser( + description="Home Assistant: Observe, Control, Automate.") + parser.add_argument( + '-c', '--config', + metavar='path_to_config_dir', + default=config_util.get_default_config_dir(), + help="Directory that contains the Home Assistant configuration") + parser.add_argument( + '-a', '--append', + action='store_true', + default=False, + help="Append to existing new format SQLite database") + parser.add_argument( + '--uri', + type=str, + help="Connect to URI and import (implies --append)" + "eg: mysql://localhost/homeassistant") + + args = parser.parse_args() + + config_dir = os.path.join(os.getcwd(), args.config) + + # Test if configuration directory exists + if not os.path.isdir(config_dir): + if config_dir != config_util.get_default_config_dir(): + print(('Fatal Error: Specified configuration directory does ' + 'not exist {} ').format(config_dir)) + sys.exit(1) + else: + config_dir = config_util.get_default_config_dir() + + src_db = '{}/home-assistant.db'.format(config_dir) + dst_db = '{}/home-assistant_v2.db'.format(config_dir) + + if not os.path.exists(src_db): + print("Fatal Error: Old format database '{}' does not exist".format( + src_db)) + sys.exit(1) + if not args.uri and (os.path.exists(dst_db) and not args.append): + print("Fatal Error: New format database '{}' exists already - " + "Remove it or use --append".format(dst_db)) + print("Note: --append must maintain an ID mapping and is much slower") + print("and requires sufficient memory to track all event IDs") + sys.exit(1) + + conn = sqlite3.connect(src_db) + uri = args.uri or "sqlite:///{}".format(dst_db) + + engine = create_engine(uri, echo=False) + models.Base.metadata.create_all(engine) + session_factory = sessionmaker(bind=engine) + session = session_factory() + + append = args.append or args.uri + + c = conn.cursor() + c.execute("SELECT count(*) FROM recorder_runs") + num_rows = c.fetchone()[0] + print("Converting {} recorder_runs".format(num_rows)) + c.close() + + c = conn.cursor() + n = 0 + for row in c.execute("SELECT * FROM recorder_runs"): + n += 1 + session.add(models.RecorderRuns( + start=ts_to_dt(row[1]), + end=ts_to_dt(row[2]), + closed_incorrect=row[3], + created=ts_to_dt(row[4]) + )) + if n % 1000 == 0: + session.commit() + print_progress(n, num_rows) + print_progress(n, num_rows) + session.commit() + c.close() + + c = conn.cursor() + c.execute("SELECT count(*) FROM events") + num_rows = c.fetchone()[0] + print("Converting {} events".format(num_rows)) + c.close() + + id_mapping = {} + + c = conn.cursor() + n = 0 + for row in c.execute("SELECT * FROM events"): + n += 1 + o = models.Events( + event_type=row[1], + event_data=row[2], + origin=row[3], + created=ts_to_dt(row[4]), + time_fired=ts_to_dt(row[5]), + ) + session.add(o) + if append: + session.flush() + id_mapping[row[0]] = o.event_id + if n % 1000 == 0: + session.commit() + print_progress(n, num_rows) + print_progress(n, num_rows) + session.commit() + c.close() + + c = conn.cursor() + c.execute("SELECT count(*) FROM states") + num_rows = c.fetchone()[0] + print("Converting {} states".format(num_rows)) + c.close() + + c = conn.cursor() + n = 0 + for row in c.execute("SELECT * FROM states"): + n += 1 + session.add(models.States( + entity_id=row[1], + state=row[2], + attributes=row[3], + last_changed=ts_to_dt(row[4]), + last_updated=ts_to_dt(row[5]), + event_id=id_mapping.get(row[6], row[6]), + domain=row[7] + )) + if n % 1000 == 0: + session.commit() + print_progress(n, num_rows) + print_progress(n, num_rows) + session.commit() + c.close() + +if __name__ == "__main__": + main() diff --git a/requirements_all.txt b/requirements_all.txt index d591ab7c91d..58b1eded462 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -383,6 +383,9 @@ somecomfort==0.2.1 # homeassistant.components.sensor.speedtest speedtest-cli==0.3.4 +# homeassistant.components.recorder +sqlalchemy==1.0.13 + # homeassistant.components.http static3==0.7.0 diff --git a/script/gen_requirements_all.py b/script/gen_requirements_all.py index 872d13bab75..1fae3b92600 100755 --- a/script/gen_requirements_all.py +++ b/script/gen_requirements_all.py @@ -6,7 +6,7 @@ import pkgutil import re import sys -COMMENT_REQUIREMENTS = [ +COMMENT_REQUIREMENTS = ( 'RPi.GPIO', 'rpi-rf', 'Adafruit_Python_DHT', @@ -14,7 +14,11 @@ COMMENT_REQUIREMENTS = [ 'pybluez', 'bluepy', 'python-lirc', -] +) + +IGNORE_PACKAGES = ( + 'homeassistant.components.recorder.models', +) def explore_module(package, explore_children): @@ -59,7 +63,8 @@ def gather_modules(): try: module = importlib.import_module(package) except ImportError: - errors.append(package) + if package not in IGNORE_PACKAGES: + errors.append(package) continue if not getattr(module, 'REQUIREMENTS', None): diff --git a/tests/components/test_history.py b/tests/components/test_history.py index 77dc24655ba..447629ee070 100644 --- a/tests/components/test_history.py +++ b/tests/components/test_history.py @@ -25,9 +25,13 @@ class TestComponentHistory(unittest.TestCase): def init_recorder(self): """Initialize the recorder.""" - with patch('homeassistant.core.Config.path', return_value=':memory:'): - recorder.setup(self.hass, {}) + db_uri = 'sqlite://' + with patch('homeassistant.core.Config.path', return_value=db_uri): + recorder.setup(self.hass, config={ + "recorder": { + "db_url": db_uri}}) self.hass.start() + recorder._INSTANCE.block_till_db_ready() self.wait_recording_done() def wait_recording_done(self): diff --git a/tests/components/test_recorder.py b/tests/components/test_recorder.py index 0577ab27889..08efaa71bbf 100644 --- a/tests/components/test_recorder.py +++ b/tests/components/test_recorder.py @@ -1,8 +1,8 @@ """The tests for the Recorder component.""" # pylint: disable=too-many-public-methods,protected-access import unittest -import time import json +from datetime import datetime, timedelta from unittest.mock import patch from homeassistant.const import MATCH_ALL @@ -17,9 +17,14 @@ class TestRecorder(unittest.TestCase): def setUp(self): # pylint: disable=invalid-name """Setup things to be run when tests are started.""" self.hass = get_test_home_assistant() - with patch('homeassistant.core.Config.path', return_value=':memory:'): - recorder.setup(self.hass, {}) + db_uri = 'sqlite://' + with patch('homeassistant.core.Config.path', return_value=db_uri): + recorder.setup(self.hass, config={ + "recorder": { + "db_url": db_uri}}) self.hass.start() + recorder._INSTANCE.block_till_db_ready() + self.session = recorder.Session() recorder._INSTANCE.block_till_done() def tearDown(self): # pylint: disable=invalid-name @@ -29,12 +34,13 @@ class TestRecorder(unittest.TestCase): def _add_test_states(self): """Add multiple states to the db for testing.""" - now = int(time.time()) - five_days_ago = now - (60*60*24*5) + now = datetime.now() + five_days_ago = now - timedelta(days=5) attributes = {'test_attr': 5, 'test_attr_10': 'nice'} self.hass.pool.block_till_done() recorder._INSTANCE.block_till_done() + for event_id in range(5): if event_id < 3: timestamp = five_days_ago @@ -42,19 +48,24 @@ class TestRecorder(unittest.TestCase): else: timestamp = now state = 'dontpurgeme' - recorder.query("INSERT INTO states (" - "entity_id, domain, state, attributes," - "last_changed, last_updated, created," - "utc_offset, event_id)" - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", - ('test.recorder2', 'sensor', state, - json.dumps(attributes), timestamp, timestamp, - timestamp, -18000, event_id + 1000)) + + self.session.add(recorder.get_model('States')( + entity_id='test.recorder2', + domain='sensor', + state=state, + attributes=json.dumps(attributes), + last_changed=timestamp, + last_updated=timestamp, + created=timestamp, + event_id=event_id + 1000 + )) + + self.session.commit() def _add_test_events(self): """Add a few events for testing.""" - now = int(time.time()) - five_days_ago = now - (60*60*24*5) + now = datetime.now() + five_days_ago = now - timedelta(days=5) event_data = {'test_attr': 5, 'test_attr_10': 'nice'} self.hass.pool.block_till_done() @@ -66,12 +77,14 @@ class TestRecorder(unittest.TestCase): else: timestamp = now event_type = 'EVENT_TEST' - recorder.query("INSERT INTO events" - "(event_type, event_data, origin, created," - "time_fired, utc_offset)" - "VALUES (?, ?, ?, ?, ?, ?)", - (event_type, json.dumps(event_data), 'LOCAL', - timestamp, timestamp, -18000)) + + self.session.add(recorder.get_model('Events')( + event_type=event_type, + event_data=json.dumps(event_data), + origin='LOCAL', + created=timestamp, + time_fired=timestamp, + )) def test_saving_state(self): """Test saving and restoring a state.""" @@ -84,7 +97,8 @@ class TestRecorder(unittest.TestCase): self.hass.pool.block_till_done() recorder._INSTANCE.block_till_done() - states = recorder.query_states('SELECT * FROM states') + states = recorder.execute( + recorder.query('States')) self.assertEqual(1, len(states)) self.assertEqual(self.hass.states.get(entity_id), states[0]) @@ -108,8 +122,9 @@ class TestRecorder(unittest.TestCase): self.hass.pool.block_till_done() recorder._INSTANCE.block_till_done() - db_events = recorder.query_events( - 'SELECT * FROM events WHERE event_type = ?', (event_type, )) + db_events = recorder.execute( + recorder.query('Events').filter_by( + event_type=event_type)) assert len(events) == 1 assert len(db_events) == 1 @@ -129,51 +144,45 @@ class TestRecorder(unittest.TestCase): """Test deleting old states.""" self._add_test_states() # make sure we start with 5 states - states = recorder.query_states('SELECT * FROM states') - self.assertEqual(len(states), 5) + states = recorder.query('States') + self.assertEqual(states.count(), 5) # run purge_old_data() recorder._INSTANCE.purge_days = 4 recorder._INSTANCE._purge_old_data() # we should only have 2 states left after purging - states = recorder.query_states('SELECT * FROM states') - self.assertEqual(len(states), 2) + self.assertEqual(states.count(), 2) def test_purge_old_events(self): """Test deleting old events.""" self._add_test_events() - events = recorder.query_events('SELECT * FROM events WHERE ' - 'event_type LIKE "EVENT_TEST%"') - self.assertEqual(len(events), 5) + events = recorder.query('Events').filter( + recorder.get_model('Events').event_type.like("EVENT_TEST%")) + self.assertEqual(events.count(), 5) # run purge_old_data() recorder._INSTANCE.purge_days = 4 recorder._INSTANCE._purge_old_data() # now we should only have 3 events left - events = recorder.query_events('SELECT * FROM events WHERE ' - 'event_type LIKE "EVENT_TEST%"') - self.assertEqual(len(events), 3) + self.assertEqual(events.count(), 3) def test_purge_disabled(self): """Test leaving purge_days disabled.""" self._add_test_states() self._add_test_events() # make sure we start with 5 states and events - states = recorder.query_states('SELECT * FROM states') - events = recorder.query_events('SELECT * FROM events WHERE ' - 'event_type LIKE "EVENT_TEST%"') - self.assertEqual(len(states), 5) - self.assertEqual(len(events), 5) + states = recorder.query('States') + events = recorder.query('Events').filter( + recorder.get_model('Events').event_type.like("EVENT_TEST%")) + self.assertEqual(states.count(), 5) + self.assertEqual(events.count(), 5) # run purge_old_data() recorder._INSTANCE.purge_days = None recorder._INSTANCE._purge_old_data() # we should have all of our states still - states = recorder.query_states('SELECT * FROM states') - events = recorder.query_events('SELECT * FROM events WHERE ' - 'event_type LIKE "EVENT_TEST%"') - self.assertEqual(len(states), 5) - self.assertEqual(len(events), 5) + self.assertEqual(states.count(), 5) + self.assertEqual(events.count(), 5)