Provide API for 5 last states of entity

This commit is contained in:
Paulus Schoutsen 2015-01-31 20:06:30 -08:00
parent 3c95d80d3e
commit 807ceadf8b
2 changed files with 171 additions and 66 deletions

View File

@ -1,3 +1,9 @@
"""
homeassistant.components.history
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Provide pre-made queries on top of the recorder component.
"""
import re import re
import homeassistant.components.recorder as recorder import homeassistant.components.recorder as recorder
@ -8,45 +14,29 @@ DEPENDENCIES = ['recorder', 'http']
def last_5_states(entity_id): def last_5_states(entity_id):
""" Return the last 5 states for entity_id. """ """ Return the last 5 states for entity_id. """
return recorder.query_states( query = """
"SELECT * FROM states WHERE entity_id=? AND " SELECT * FROM states WHERE entity_id=? AND
"last_changed=last_updated " last_changed=last_updated AND {}
"ORDER BY last_changed DESC LIMIT 0, 5", (entity_id, )) ORDER BY last_changed DESC LIMIT 0, 5
""".format(recorder.limit_to_run())
return recorder.query_states(query, (entity_id, ))
def last_5_events():
""" Return the last 5 events (dev method). """
return recorder.query_events(
"SELECT * FROM events ORDER BY created DESC LIMIT 0, 5")
def states_history(point_in_time):
""" Return states at a specific point in time. """
# Find homeassistant.start before point_in_time
# Find last state for each entity after homeassistant.start
# Ignore all states where state == ''
pass
def setup(hass, config): def setup(hass, config):
""" Setup history hooks. """ """ Setup history hooks. """
hass.http.register_path( hass.http.register_path(
'GET', 'GET',
re.compile('/api/component/recorder/(?P<entity_id>[a-zA-Z\._0-9]+)/last_5_states'), re.compile(
_api_last_5_states), r'/api/history/(?P<entity_id>[a-zA-Z\._0-9]+)/recent_states'),
hass.http.register_path( _api_last_5_states)
'GET',
re.compile('/api/component/recorder/last_5_events'), return True
_api_last_5_events),
# pylint: disable=invalid-name # pylint: disable=invalid-name
def _api_last_5_states(handler, path_match, data): def _api_last_5_states(handler, path_match, data):
""" Return the last 5 states for an entity id as JSON. """
entity_id = path_match.group('entity_id') entity_id = path_match.group('entity_id')
handler.write_json(list(last_5_states(entity_id))) handler.write_json(list(last_5_states(entity_id)))
# pylint: disable=invalid-name
def _api_last_5_events(handler, path_match, data):
handler.write_json(list(last_5_events))

View File

@ -1,3 +1,10 @@
"""
homeassistant.components.recorder
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Component that records all events and state changes.
Allows other components to query this database.
"""
import logging import logging
import threading import threading
import queue import queue
@ -5,6 +12,7 @@ import sqlite3
from datetime import datetime from datetime import datetime
import time import time
import json import json
import atexit
from homeassistant import Event, EventOrigin, State from homeassistant import Event, EventOrigin, State
from homeassistant.remote import JSONEncoder from homeassistant.remote import JSONEncoder
@ -16,25 +24,36 @@ DOMAIN = "recorder"
DEPENDENCIES = [] DEPENDENCIES = []
DB_FILE = 'home-assistant.db' DB_FILE = 'home-assistant.db'
RETURN_ROWCOUNT = "rowcount"
RETURN_LASTROWID = "lastrowid"
RETURN_ONE_ROW = "one_row"
_INSTANCE = None _INSTANCE = None
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
def query(query, arguments): def query(sql_query, arguments=None):
""" Query the database. """ """ Query the database. """
verify_instance() _verify_instance()
return _INSTANCE.query(query, arguments) return _INSTANCE.query(sql_query, arguments)
def query_states(state_query, arguments): def query_states(state_query, arguments=None):
""" Query the database and return a list of states. """ """ Query the database and return a list of states. """
return filter(None, (row_to_state(row) for row in query(state_query, arguments))) return (
row for row in
(row_to_state(row) for row in query(state_query, arguments))
if row is not None)
def query_events(event_query, arguments): def query_events(event_query, arguments=None):
""" Query the database and return a list of states. """ """ Query the database and return a list of states. """
return filter(None, (row_to_event(row) for row in query(event_query, arguments))) return (
row for row in
(row_to_event(row) for row in query(event_query, arguments))
if row is not None)
def row_to_state(row): def row_to_state(row):
@ -44,6 +63,7 @@ def row_to_state(row):
row[1], row[2], json.loads(row[3]), datetime.fromtimestamp(row[4])) row[1], row[2], json.loads(row[3]), datetime.fromtimestamp(row[4]))
except ValueError: except ValueError:
# When json.loads fails # When json.loads fails
_LOGGER.exception("Error converting row to state: %s", row)
return None return None
@ -53,17 +73,48 @@ def row_to_event(row):
return Event(row[1], json.loads(row[2]), EventOrigin[row[3].lower()]) return Event(row[1], json.loads(row[2]), EventOrigin[row[3].lower()])
except ValueError: except ValueError:
# When json.oads fails # When json.oads fails
_LOGGER.exception("Error converting row to event: %s", row)
return None return None
def verify_instance(): def limit_to_run(point_in_time=None):
""" Raise error if recorder is not setup. """ """
if _INSTANCE is None: Returns a WHERE partial that will limit query to a run.
raise RuntimeError("Recorder not initialized.") A run starts when Home Assistant starts and ends when it stops.
"""
_verify_instance()
end_event = None
# Targetting current run
if point_in_time is None:
return "created >= {}".format(
_adapt_datetime(_INSTANCE.recording_start))
start_event = query(
("SELECT * FROM events WHERE event_type = ? AND created < ? "
"ORDER BY created DESC LIMIT 0, 1"),
(EVENT_HOMEASSISTANT_START, point_in_time))[0]
end_query = query(
("SELECT * FROM events WHERE event_type = ? AND created > ? "
"ORDER BY created ASC LIMIT 0, 1"),
(EVENT_HOMEASSISTANT_START, point_in_time))
if end_query:
end_event = end_query[0]
where_part = "created >= {}".format(start_event['created'])
if end_event is None:
return where_part
else:
return "{} and created < {}".format(where_part, end_event['created'])
def setup(hass, config): def setup(hass, config):
""" Setup the recorder. """ """ Setup the recorder. """
# pylint: disable=global-statement
global _INSTANCE global _INSTANCE
_INSTANCE = Recorder(hass) _INSTANCE = Recorder(hass)
@ -83,11 +134,11 @@ class Recorder(threading.Thread):
self.queue = queue.Queue() self.queue = queue.Queue()
self.quit_object = object() self.quit_object = object()
self.lock = threading.Lock() self.lock = threading.Lock()
self.recording_start = datetime.now()
def start_recording(event): def start_recording(event):
""" Start recording. """ """ Start recording. """
self.start() self.start()
hass.states.set('paulus.held', 'juist', {'nou en': 'bier'})
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_recording) hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_recording)
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdown) hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdown)
@ -96,11 +147,13 @@ class Recorder(threading.Thread):
def run(self): def run(self):
""" Start processing events to save. """ """ Start processing events to save. """
self._setup_connection() self._setup_connection()
self._setup_run()
while True: while True:
event = self.queue.get() event = self.queue.get()
if event == self.quit_object: if event == self.quit_object:
self._close_run()
self._close_connection() self._close_connection()
return return
@ -150,70 +203,132 @@ class Recorder(threading.Thread):
"event_type, event_data, origin, created" "event_type, event_data, origin, created"
") values (?, ?, ?, ?)", info) ") values (?, ?, ?, ?)", info)
def query(self, query, data=None): def query(self, sql_query, data=None, return_value=None):
""" Query the database. """ """ Query the database. """
try: try:
with self.conn, self.lock: with self.conn, self.lock:
_LOGGER.info("Running query %s", sql_query)
cur = self.conn.cursor() cur = self.conn.cursor()
if data is not None: if data is not None:
cur.execute(query, data) cur.execute(sql_query, data)
else: else:
cur.execute(query) cur.execute(sql_query)
return cur.fetchall() if return_value == RETURN_ROWCOUNT:
return cur.rowcount
elif return_value == RETURN_LASTROWID:
return cur.lastrowid
elif return_value == RETURN_ONE_ROW:
return cur.fetchone()
else:
return cur.fetchall()
except sqlite3.IntegrityError: except sqlite3.IntegrityError:
_LOGGER.exception("Error querying the database using: %s", query) _LOGGER.exception(
"Error querying the database using: %s", sql_query)
return [] return []
def _setup_connection(self): def _setup_connection(self):
""" Ensure database is ready to fly. """ """ Ensure database is ready to fly. """
db_path = self.hass.get_config_path(DB_FILE) db_path = self.hass.get_config_path(DB_FILE)
self.conn = sqlite3.connect(db_path, check_same_thread=False) self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
# Make sure the database is closed whenever Python exits
# without the STOP event being fired.
atexit.register(self._close_connection)
# Have datetime objects be saved as integers # Have datetime objects be saved as integers
sqlite3.register_adapter(datetime, adapt_datetime) sqlite3.register_adapter(datetime, _adapt_datetime)
# Validate we are on the correct schema or that we have to migrate # Validate we are on the correct schema or that we have to migrate
c = self.conn.cursor() cur = self.conn.cursor()
def save_migration(migration_id): def save_migration(migration_id):
c.execute('INSERT INTO schema_version VALUES (?, ?)', """ Save and commit a migration to the database. """
(migration_id, datetime.now())) cur.execute('INSERT INTO schema_version VALUES (?, ?)',
(migration_id, datetime.now()))
self.conn.commit() self.conn.commit()
_LOGGER.info("Database migrated to version %d", migration_id) _LOGGER.info("Database migrated to version %d", migration_id)
try: try:
c.execute('SELECT max(migration_id) FROM schema_version;') cur.execute('SELECT max(migration_id) FROM schema_version;')
migration_id = c.fetchone()[0] or 0 migration_id = cur.fetchone()[0] or 0
except sqlite3.OperationalError: except sqlite3.OperationalError:
# The table does not exist # The table does not exist
c.execute('CREATE TABLE schema_version ' cur.execute('CREATE TABLE schema_version ('
'(migration_id integer primary key, performed integer)') 'migration_id integer primary key, performed integer)')
migration_id = 0 migration_id = 0
if migration_id < 1: if migration_id < 1:
c.execute( cur.execute("""
'CREATE TABLE events (event_id integer primary key, ' CREATE TABLE recorder_runs (
'event_type text, event_data text, origin text, ' run_id integer primary key,
'created integer)') start integer,
c.execute('CREATE INDEX events__event_type ON events(event_type)') end integer,
closed_incorrect integer default 0,
created integer)
""")
c.execute( cur.execute("""
'CREATE TABLE states (state_id integer primary key, ' CREATE TABLE events (
'entity_id text, state text, attributes text, ' event_id integer primary key,
'last_changed integer, last_updated integer, created integer)') event_type text,
c.execute('CREATE INDEX states__entity_id ON states(entity_id)') event_data text,
origin text,
created integer)
""")
cur.execute(
'CREATE INDEX events__event_type ON events(event_type)')
cur.execute("""
CREATE TABLE states (
state_id integer primary key,
entity_id text,
state text,
attributes text,
last_changed integer,
last_updated integer,
created integer)
""")
cur.execute('CREATE INDEX states__entity_id ON states(entity_id)')
save_migration(1) save_migration(1)
def _close_connection(self): def _close_connection(self):
""" Close connection to the database. """
_LOGGER.info("Closing database") _LOGGER.info("Closing database")
atexit.unregister(self._close_connection)
self.conn.close() self.conn.close()
def _setup_run(self):
""" Log the start of the current run. """
if self.query("""UPDATE recorder_runs SET end=?, closed_incorrect=1
WHERE end IS NULL""", (self.recording_start, ),
return_value=RETURN_ROWCOUNT):
# Setup datetime to save as a integer _LOGGER.warning("Found unfinished sessions")
def adapt_datetime(ts):
return time.mktime(ts.timetuple()) self.query(
"INSERT INTO recorder_runs (start, created) VALUES (?, ?)",
(self.recording_start, datetime.now()))
def _close_run(self):
""" Save end time for current run. """
self.query(
"UPDATE recorder_runs SET end=? WHERE start=?",
(datetime.now(), self.recording_start))
def _adapt_datetime(datetimestamp):
""" Turn a datetime into an integer for in the DB. """
return time.mktime(datetimestamp.timetuple())
def _verify_instance():
""" throws error if recorder not initialized. """
if _INSTANCE is None:
raise RuntimeError("Recorder not initialized.")