mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-10-07 02:39:32 +00:00

* Refactory code / object handling * Next step * fix lint * Step 2 * Cleanup API code * cleanup addons code * cleanup data handling * Cleanup addons data handling * Cleanup docker api * clean docker api p2 * next cleanup round * cleanup start on snapshots * update format strings * fix setup * fix lint * fix lint * fix lint * fix tox * Fix wrong import of datetime module * Fix bug with attributes * fix extraction * Update core * Update logs * Expand scheduler * add support for time interval objects * next updates on tasks * Fix some things * Cleanup code / supervisor * fix lint * Fix some code styles * rename stuff * cleanup api call reload * fix lock replacment * fix lint * fix lint * fix bug * fix wrong config links * fix bugs * fix bug * Update version on startup * Fix some bugs * fix bug * Fix snapshot * Add wait boot options * fix lint * fix default config * fix snapshot * fix snapshot * load snapshots on startup * add log message at the end * Some cleanups * fix bug * add logger * add logger for supervisor update * Add more logger
99 lines
2.9 KiB
Python
99 lines
2.9 KiB
Python
"""Init file for HassIO security rest api."""
|
|
from datetime import datetime, timedelta
|
|
import io
|
|
import logging
|
|
import hashlib
|
|
import os
|
|
|
|
from aiohttp import web
|
|
import voluptuous as vol
|
|
import pyotp
|
|
import pyqrcode
|
|
|
|
from .utils import api_process, api_validate, hash_password
|
|
from ..const import ATTR_INITIALIZE, ATTR_PASSWORD, ATTR_TOTP, ATTR_SESSION
|
|
from ..coresys import CoreSysAttributes
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
SCHEMA_PASSWORD = vol.Schema({
|
|
vol.Required(ATTR_PASSWORD): vol.Coerce(str),
|
|
})
|
|
|
|
SCHEMA_SESSION = SCHEMA_PASSWORD.extend({
|
|
vol.Optional(ATTR_TOTP, default=None): vol.Coerce(str),
|
|
})
|
|
|
|
|
|
class APISecurity(CoreSysAttributes):
|
|
"""Handle rest api for security functions."""
|
|
|
|
def _check_password(self, body):
|
|
"""Check if password is valid and security is initialize."""
|
|
if not self._config.security_initialize:
|
|
raise RuntimeError("First set a password")
|
|
|
|
password = hash_password(body[ATTR_PASSWORD])
|
|
if password != self._config.security_password:
|
|
raise RuntimeError("Wrong password")
|
|
|
|
@api_process
|
|
async def info(self, request):
|
|
"""Return host information."""
|
|
return {
|
|
ATTR_INITIALIZE: self._config.security_initialize,
|
|
ATTR_TOTP: self._config.security_totp is not None,
|
|
}
|
|
|
|
@api_process
|
|
async def options(self, request):
|
|
"""Set options / password."""
|
|
body = await api_validate(SCHEMA_PASSWORD, request)
|
|
|
|
if self._config.security_initialize:
|
|
raise RuntimeError("Password is already set!")
|
|
|
|
self._config.security_password = hash_password(body[ATTR_PASSWORD])
|
|
self._config.security_initialize = True
|
|
return True
|
|
|
|
@api_process
|
|
async def totp(self, request):
|
|
"""Set and initialze TOTP."""
|
|
body = await api_validate(SCHEMA_PASSWORD, request)
|
|
self._check_password(body)
|
|
|
|
# generate TOTP
|
|
totp_init_key = pyotp.random_base32()
|
|
totp = pyotp.TOTP(totp_init_key)
|
|
|
|
# init qrcode
|
|
buff = io.BytesIO()
|
|
|
|
qrcode = pyqrcode.create(totp.provisioning_uri("Hass.IO"))
|
|
qrcode.svg(buff)
|
|
|
|
# finish
|
|
self._config.security_totp = totp_init_key
|
|
return web.Response(body=buff.getvalue(), content_type='image/svg+xml')
|
|
|
|
@api_process
|
|
async def session(self, request):
|
|
"""Set and initialze session."""
|
|
body = await api_validate(SCHEMA_SESSION, request)
|
|
self._check_password(body)
|
|
|
|
# check TOTP
|
|
if self._config.security_totp:
|
|
totp = pyotp.TOTP(self._config.security_totp)
|
|
if body[ATTR_TOTP] != totp.now():
|
|
raise RuntimeError("Invalid TOTP token!")
|
|
|
|
# create session
|
|
valid_until = datetime.now() + timedelta(days=1)
|
|
session = hashlib.sha256(os.urandom(54)).hexdigest()
|
|
|
|
# store session
|
|
self._config.add_security_session(session, valid_until)
|
|
return {ATTR_SESSION: session}
|