mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-06-24 02:46:30 +00:00
87 lines
2.6 KiB
Python
87 lines
2.6 KiB
Python
"""Handle security part of this API."""
|
|
import logging
|
|
import re
|
|
|
|
from aiohttp.web import middleware
|
|
from aiohttp.web_exceptions import HTTPUnauthorized, HTTPForbidden
|
|
|
|
from ..const import HEADER_TOKEN, REQUEST_FROM
|
|
from ..coresys import CoreSysAttributes
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
NO_SECURITY_CHECK = re.compile(
|
|
r"^(?:"
|
|
r"|/homeassistant/api/.*$"
|
|
r"|/homeassistant/websocket$"
|
|
r"|/supervisor/ping$"
|
|
r")$"
|
|
)
|
|
|
|
ADDONS_API_BYPASS = re.compile(
|
|
r"^(?:"
|
|
r"|/homeassistant/info$"
|
|
r"|/supervisor/info$"
|
|
r"|/addons(?:/self/[^/]+)?$"
|
|
r")$"
|
|
)
|
|
|
|
|
|
class SecurityMiddleware(CoreSysAttributes):
|
|
"""Security middleware functions."""
|
|
|
|
def __init__(self, coresys):
|
|
"""Initialize security middleware."""
|
|
self.coresys = coresys
|
|
|
|
@middleware
|
|
async def token_validation(self, request, handler):
|
|
"""Check security access of this layer."""
|
|
request_from = None
|
|
hassio_token = request.headers.get(HEADER_TOKEN)
|
|
|
|
# Ignore security check
|
|
if NO_SECURITY_CHECK.match(request.path):
|
|
_LOGGER.debug("Passthrough %s", request.path)
|
|
return await handler(request)
|
|
|
|
# Not token
|
|
if not hassio_token:
|
|
_LOGGER.warning("No API token provided for %s", request.path)
|
|
raise HTTPUnauthorized()
|
|
|
|
# Home-Assistant
|
|
# UUID check need removed with 131
|
|
if hassio_token in (self.sys_homeassistant.uuid,
|
|
self.sys_homeassistant.hassio_token):
|
|
_LOGGER.debug("%s access from Home-Assistant", request.path)
|
|
request_from = 'homeassistant'
|
|
|
|
# Host
|
|
if hassio_token == self.sys_machine_id:
|
|
_LOGGER.debug("%s access from Host", request.path)
|
|
request_from = 'host'
|
|
|
|
# Add-on
|
|
addon = None
|
|
if hassio_token and not request_from:
|
|
addon = self.sys_addons.from_token(hassio_token)
|
|
# Need removed with 131
|
|
if not addon:
|
|
addon = self.sys_addons.from_uuid(hassio_token)
|
|
|
|
# Check Add-on API access
|
|
if addon and addon.access_hassio_api:
|
|
_LOGGER.info("%s access from %s", request.path, addon.slug)
|
|
request_from = addon.slug
|
|
elif addon and ADDONS_API_BYPASS.match(request.path):
|
|
_LOGGER.debug("Passthrough %s from %s", request.path, addon.slug)
|
|
request_from = addon.slug
|
|
|
|
if request_from:
|
|
request[REQUEST_FROM] = request_from
|
|
return await handler(request)
|
|
|
|
_LOGGER.warning("Invalid token for access %s", request.path)
|
|
raise HTTPForbidden()
|