supervisor/hassio/tools.py
Pascal Vizeli 5a80be9fd4 Allow stop/start home-assistant & flow of startup (#182)
* Allow config boot

* Read boot settings

* Use internal boot time for detect reboot

* Check if Home-Assistant need to watch

* Make datetime string and parse_datetime

* Add api calls

* fix lint p1

* Use new datetime parser for sessions and make a real default boot time

* fix lint p2

* only start docker if they is running

* convert to int (timestamp)

* add boot flag
2017-09-11 14:14:26 +02:00

166 lines
5.0 KiB
Python

"""Tools file for HassIO."""
import asyncio
from contextlib import suppress
from datetime import datetime, timedelta, timezone
import json
import logging
import re
import aiohttp
import async_timeout
import pytz
import voluptuous as vol
from voluptuous.humanize import humanize_error
_LOGGER = logging.getLogger(__name__)
FREEGEOIP_URL = "https://freegeoip.io/json/"
RE_STRING = re.compile(r"\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))")
# Copyright (c) Django Software Foundation and individual contributors.
# All rights reserved.
# https://github.com/django/django/blob/master/LICENSE
DATETIME_RE = re.compile(
r'(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})'
r'[T ](?P<hour>\d{1,2}):(?P<minute>\d{1,2})'
r'(?::(?P<second>\d{1,2})(?:\.(?P<microsecond>\d{1,6})\d{0,6})?)?'
r'(?P<tzinfo>Z|[+-]\d{2}(?::?\d{2})?)?$'
)
def write_json_file(jsonfile, data):
"""Write a json file."""
try:
json_str = json.dumps(data, indent=2)
with jsonfile.open('w') as conf_file:
conf_file.write(json_str)
except (OSError, json.JSONDecodeError):
return False
return True
def read_json_file(jsonfile):
"""Read a json file and return a dict."""
with jsonfile.open('r') as cfile:
return json.loads(cfile.read())
async def fetch_timezone(websession):
"""Read timezone from freegeoip."""
data = {}
with suppress(aiohttp.ClientError, asyncio.TimeoutError,
json.JSONDecodeError, KeyError):
with async_timeout.timeout(10, loop=websession.loop):
async with websession.get(FREEGEOIP_URL) as request:
data = await request.json()
return data.get('time_zone', 'UTC')
def convert_to_ascii(raw):
"""Convert binary to ascii and remove colors."""
return RE_STRING.sub("", raw.decode())
# Copyright (c) Django Software Foundation and individual contributors.
# All rights reserved.
# https://github.com/django/django/blob/master/LICENSE
def parse_datetime(dt_str):
"""Parse a string and return a datetime.datetime.
This function supports time zone offsets. When the input contains one,
the output uses a timezone with a fixed offset from UTC.
Raises ValueError if the input is well formatted but not a valid datetime.
Returns None if the input isn't well formatted.
"""
match = DATETIME_RE.match(dt_str)
if not match:
return None
kws = match.groupdict() # type: Dict[str, Any]
if kws['microsecond']:
kws['microsecond'] = kws['microsecond'].ljust(6, '0')
tzinfo_str = kws.pop('tzinfo')
tzinfo = None # type: Optional[dt.tzinfo]
if tzinfo_str == 'Z':
tzinfo = pytz.utc
elif tzinfo_str is not None:
offset_mins = int(tzinfo_str[-2:]) if len(tzinfo_str) > 3 else 0
offset_hours = int(tzinfo_str[1:3])
offset = timedelta(hours=offset_hours, minutes=offset_mins)
if tzinfo_str[0] == '-':
offset = -offset
tzinfo = timezone(offset)
else:
tzinfo = None
kws = {k: int(v) for k, v in kws.items() if v is not None}
kws['tzinfo'] = tzinfo
return datetime(**kws)
class JsonConfig(object):
"""Hass core object for handle it."""
def __init__(self, json_file, schema):
"""Initialize hass object."""
self._file = json_file
self._schema = schema
self._data = {}
# init or load data
if self._file.is_file():
try:
self._data = read_json_file(self._file)
except (OSError, json.JSONDecodeError):
_LOGGER.warning("Can't read %s", self._file)
self._data = {}
# validate
try:
self._data = self._schema(self._data)
except vol.Invalid as ex:
_LOGGER.error("Can't parse %s -> %s",
self._file, humanize_error(self._data, ex))
def save(self):
"""Store data to config file."""
# validate
try:
self._data = self._schema(self._data)
except vol.Invalid as ex:
_LOGGER.error("Can't parse data -> %s",
humanize_error(self._data, ex))
return False
# write
if not write_json_file(self._file, self._data):
_LOGGER.error("Can't store config in %s", self._file)
return False
return True
class AsyncThrottle(object):
"""
Decorator that prevents a function from being called more than once every
time period.
"""
def __init__(self, delta):
"""Initialize async throttle."""
self.throttle_period = delta
self.time_of_last_call = datetime.min
def __call__(self, method):
"""Throttle function"""
async def wrapper(*args, **kwargs):
"""Throttle function wrapper"""
now = datetime.now()
time_since_last_call = now - self.time_of_last_call
if time_since_last_call > self.throttle_period:
self.time_of_last_call = now
return await method(*args, **kwargs)
return wrapper