Files
supervisor/hassio/api/security.py
Pascal Vizeli 1c49351e66 Refactory code / object handling (#289)
* Refactory code / object handling

* Next step

* fix lint

* Step 2

* Cleanup API code

* cleanup addons code

* cleanup data handling

* Cleanup addons data handling

* Cleanup docker api

* clean docker api p2

* next cleanup round

* cleanup start on snapshots

* update format strings

* fix setup

* fix lint

* fix lint

* fix lint

* fix tox

* Fix wrong import of datetime module

* Fix bug with attributes

* fix extraction

* Update core

* Update logs

* Expand scheduler

* add support for time interval objects

* next updates on tasks

* Fix some things

* Cleanup code / supervisor

* fix lint

* Fix some code styles

* rename stuff

* cleanup api call reload

* fix lock replacment

* fix lint

* fix lint

* fix bug

* fix wrong config links

* fix bugs

* fix bug

* Update version on startup

* Fix some bugs

* fix bug

* Fix snapshot

* Add wait boot options

* fix lint

* fix default config

* fix snapshot

* fix snapshot

* load snapshots on startup

* add log message at the end

* Some cleanups

* fix bug

* add logger

* add logger for supervisor update

* Add more logger
2018-01-02 21:21:29 +01:00

99 lines
2.9 KiB
Python

"""Init file for HassIO security rest api."""
from datetime import datetime, timedelta
import io
import logging
import hashlib
import os
from aiohttp import web
import voluptuous as vol
import pyotp
import pyqrcode
from .utils import api_process, api_validate, hash_password
from ..const import ATTR_INITIALIZE, ATTR_PASSWORD, ATTR_TOTP, ATTR_SESSION
from ..coresys import CoreSysAttributes
_LOGGER = logging.getLogger(__name__)
SCHEMA_PASSWORD = vol.Schema({
vol.Required(ATTR_PASSWORD): vol.Coerce(str),
})
SCHEMA_SESSION = SCHEMA_PASSWORD.extend({
vol.Optional(ATTR_TOTP, default=None): vol.Coerce(str),
})
class APISecurity(CoreSysAttributes):
"""Handle rest api for security functions."""
def _check_password(self, body):
"""Check if password is valid and security is initialize."""
if not self._config.security_initialize:
raise RuntimeError("First set a password")
password = hash_password(body[ATTR_PASSWORD])
if password != self._config.security_password:
raise RuntimeError("Wrong password")
@api_process
async def info(self, request):
"""Return host information."""
return {
ATTR_INITIALIZE: self._config.security_initialize,
ATTR_TOTP: self._config.security_totp is not None,
}
@api_process
async def options(self, request):
"""Set options / password."""
body = await api_validate(SCHEMA_PASSWORD, request)
if self._config.security_initialize:
raise RuntimeError("Password is already set!")
self._config.security_password = hash_password(body[ATTR_PASSWORD])
self._config.security_initialize = True
return True
@api_process
async def totp(self, request):
"""Set and initialze TOTP."""
body = await api_validate(SCHEMA_PASSWORD, request)
self._check_password(body)
# generate TOTP
totp_init_key = pyotp.random_base32()
totp = pyotp.TOTP(totp_init_key)
# init qrcode
buff = io.BytesIO()
qrcode = pyqrcode.create(totp.provisioning_uri("Hass.IO"))
qrcode.svg(buff)
# finish
self._config.security_totp = totp_init_key
return web.Response(body=buff.getvalue(), content_type='image/svg+xml')
@api_process
async def session(self, request):
"""Set and initialze session."""
body = await api_validate(SCHEMA_SESSION, request)
self._check_password(body)
# check TOTP
if self._config.security_totp:
totp = pyotp.TOTP(self._config.security_totp)
if body[ATTR_TOTP] != totp.now():
raise RuntimeError("Invalid TOTP token!")
# create session
valid_until = datetime.now() + timedelta(days=1)
session = hashlib.sha256(os.urandom(54)).hexdigest()
# store session
self._config.add_security_session(session, valid_until)
return {ATTR_SESSION: session}