From 215987d5a7ac4a0b1d808d3eb642b1ee15cf1bb3 Mon Sep 17 00:00:00 2001 From: Jan Losinski Date: Mon, 24 Apr 2017 23:01:09 +0200 Subject: [PATCH] Add script to import state events to InfluxDB (#7254) * Add script to import state events to InfluxDB This adds a script to import recorded events from a recorder database to a InfluxDB instance. This can be useful for initial importing after setup of a InfluxDB. Signed-off-by: Jan Losinski * Fix step argument handling in Influx import Signed-off-by: Jan Losinski * Correct typo in InfluxDB Importer Signed-off-by: Jan Losinski * Update influxdb_import.py --- homeassistant/scripts/influxdb_import.py | 220 +++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 homeassistant/scripts/influxdb_import.py diff --git a/homeassistant/scripts/influxdb_import.py b/homeassistant/scripts/influxdb_import.py new file mode 100644 index 00000000000..2d9af4c7529 --- /dev/null +++ b/homeassistant/scripts/influxdb_import.py @@ -0,0 +1,220 @@ +"""Script to import recorded data into influxdb.""" +import argparse +import json +import os + +from typing import List + +import homeassistant.config as config_util + + +def run(script_args: List) -> int: + """The actual script body.""" + from sqlalchemy import create_engine + from sqlalchemy.orm import sessionmaker + from influxdb import InfluxDBClient + from homeassistant.components.recorder import models + from homeassistant.helpers import state as state_helper + from homeassistant.core import State + + parser = argparse.ArgumentParser( + description="import data to influxDB.") + parser.add_argument( + '-c', '--config', + metavar='path_to_config_dir', + default=config_util.get_default_config_dir(), + help="Directory that contains the Home Assistant configuration") + parser.add_argument( + '--uri', + type=str, + help="Connect to URI and import (if other than default sqlite) " + "eg: mysql://localhost/homeassistant") + parser.add_argument( + '-d', '--dbname', + metavar='dbname', + required=True, + help="InfluxDB database name") + parser.add_argument( + '-H', '--host', + metavar='host', + default='127.0.0.1', + help="InfluxDB host address") + parser.add_argument( + '-P', '--port', + metavar='port', + default=8086, + help="InfluxDB host port") + parser.add_argument( + '-u', '--username', + metavar='username', + default='root', + help="InfluxDB username") + parser.add_argument( + '-p', '--password', + metavar='password', + default='root', + help="InfluxDB password") + parser.add_argument( + '-s', '--step', + metavar='step', + default=1000, + help="How many points to import at the same time") + parser.add_argument( + '-t', '--tags', + metavar='tags', + default="", + help="Comma separated list of tags (key:value) for all points") + parser.add_argument( + '-D', '--default-measurement', + metavar='default_measurement', + default="", + help="Store all your points in the same measurement") + parser.add_argument( + '-o', '--override-measurement', + metavar='override_measurement', + default="", + help="Store all your points in the same measurement") + parser.add_argument( + '-e', '--exclude_entities', + metavar='exclude_entities', + default="", + help="Comma separated list of excluded entities") + parser.add_argument( + '-E', '--exclude_domains', + metavar='exclude_domains', + default="", + help="Comma separated list of excluded domains") + parser.add_argument( + "-S", "--simulate", + default=False, + action="store_true", + help=("Do not write points but simulate preprocessing and print " + "statistics")) + parser.add_argument( + '--script', + choices=['influxdb_import']) + + args = parser.parse_args() + simulate = args.simulate + + client = None + if not simulate: + client = InfluxDBClient(args.host, args.port, + args.username, args.password) + client.switch_database(args.dbname) + + config_dir = os.path.join(os.getcwd(), args.config) # type: str + + # Test if configuration directory exists + if not os.path.isdir(config_dir): + if config_dir != config_util.get_default_config_dir(): + print(('Fatal Error: Specified configuration directory does ' + 'not exist {} ').format(config_dir)) + return 1 + + src_db = '{}/home-assistant_v2.db'.format(config_dir) + + if not os.path.exists(src_db) and not args.uri: + print("Fatal Error: Database '{}' does not exist " + "and no uri given".format(src_db)) + return 1 + + uri = args.uri or "sqlite:///{}".format(src_db) + engine = create_engine(uri, echo=False) + session_factory = sessionmaker(bind=engine) + session = session_factory() + step = int(args.step) + + tags = {} + if args.tags: + tags.update(dict(elem.split(":") for elem in args.tags.split(","))) + excl_entities = args.exclude_entities.split(",") + excl_domains = args.exclude_domains.split(",") + override_measurement = args.override_measurement + default_measurement = args.default_measurement + + query = session.query(models.Events).filter( + models.Events.event_type == "state_changed").order_by( + models.Events.time_fired) + + points = [] + count = 0 + from collections import defaultdict + entities = defaultdict(int) + + for event in query: + event_data = json.loads(event.event_data) + state = State.from_dict(event_data.get("new_state")) + + if not state or ( + excl_entities and state.entity_id in excl_entities) or ( + excl_domains and state.domain in excl_domains): + session.expunge(event) + continue + + try: + _state = float(state_helper.state_as_number(state)) + _state_key = "value" + except ValueError: + _state = state.state + _state_key = "state" + + if override_measurement: + measurement = override_measurement + else: + measurement = state.attributes.get('unit_of_measurement') + if measurement in (None, ''): + if default_measurement: + measurement = default_measurement + else: + measurement = state.entity_id + + point = { + 'measurement': measurement, + 'tags': { + 'domain': state.domain, + 'entity_id': state.object_id, + }, + 'time': event.time_fired, + 'fields': { + _state_key: _state, + } + } + + for key, value in state.attributes.items(): + if key != 'unit_of_measurement': + # If the key is already in fields + if key in point['fields']: + key = key + "_" + # Prevent column data errors in influxDB. + # For each value we try to cast it as float + # But if we can not do it we store the value + # as string add "_str" postfix to the field key + try: + point['fields'][key] = float(value) + except (ValueError, TypeError): + new_key = "{}_str".format(key) + point['fields'][new_key] = str(value) + + entities[state.entity_id] += 1 + point['tags'].update(tags) + points.append(point) + session.expunge(event) + if len(points) >= step: + if not simulate: + print("Write {} points to the database".format(len(points))) + client.write_points(points) + count += len(points) + points = [] + + if points: + if not simulate: + print("Write {} points to the database".format(len(points))) + client.write_points(points) + count += len(points) + + print("\nStatistics:") + print("\n".join(["{:6}: {}".format(v, k) for k, v + in sorted(entities.items(), key=lambda x: x[1])])) + print("\nImport finished {} points written".format(count)) + return 0