supervisor/hassio/homeassistant.py
Pascal Vizeli 67f562a846
Init ingress session boarder / lookup (#995)
* Init ingress session boarder / lookup

* Add session to API

* Add cokkie validate

* Do it without event bus

* Add logger

* Fix validation

* Add tests

* Update tests

* Mock json storage
2019-04-05 17:36:07 +02:00

555 lines
18 KiB
Python

"""Home Assistant control object."""
import asyncio
from contextlib import asynccontextmanager, suppress
from datetime import datetime, timedelta
from ipaddress import IPv4Address
import logging
import os
from pathlib import Path
import re
import secrets
import socket
import time
from typing import Any, AsyncContextManager, Awaitable, Dict, Optional
from uuid import UUID
import aiohttp
from aiohttp import hdrs
import attr
from .const import (
ATTR_ACCESS_TOKEN,
ATTR_BOOT,
ATTR_IMAGE,
ATTR_LAST_VERSION,
ATTR_PASSWORD,
ATTR_PORT,
ATTR_REFRESH_TOKEN,
ATTR_SSL,
ATTR_UUID,
ATTR_WAIT_BOOT,
ATTR_WATCHDOG,
FILE_HASSIO_HOMEASSISTANT,
HEADER_HA_ACCESS,
)
from .coresys import CoreSys, CoreSysAttributes
from .docker.homeassistant import DockerHomeAssistant
from .docker.stats import DockerStats
from .exceptions import (
DockerAPIError,
HomeAssistantAPIError,
HomeAssistantAuthError,
HomeAssistantError,
HomeAssistantUpdateError,
)
from .utils import convert_to_ascii, process_lock
from .utils.json import JsonConfig
from .validate import SCHEMA_HASS_CONFIG
_LOGGER = logging.getLogger(__name__)
RE_YAML_ERROR = re.compile(r"homeassistant\.util\.yaml")
@attr.s(frozen=True)
class ConfigResult:
"""Return object from config check."""
valid = attr.ib()
log = attr.ib()
class HomeAssistant(JsonConfig, CoreSysAttributes):
"""Home Assistant core object for handle it."""
def __init__(self, coresys: CoreSys):
"""Initialize Home Assistant object."""
super().__init__(FILE_HASSIO_HOMEASSISTANT, SCHEMA_HASS_CONFIG)
self.coresys: CoreSys = coresys
self.instance: DockerHomeAssistant = DockerHomeAssistant(coresys)
self.lock: asyncio.Lock = asyncio.Lock(loop=coresys.loop)
self._error_state: bool = False
# We don't persist access tokens. Instead we fetch new ones when needed
self.access_token: Optional[str] = None
self._access_token_expires: Optional[datetime] = None
async def load(self) -> None:
"""Prepare Home Assistant object."""
with suppress(DockerAPIError):
await self.instance.attach()
return
_LOGGER.info("No Home Assistant Docker image %s found.", self.image)
await self.install_landingpage()
@property
def machine(self) -> str:
"""Return the system machines."""
return self.instance.machine
@property
def arch(self) -> str:
"""Return arch of running Home Assistant."""
return self.instance.arch
@property
def error_state(self) -> bool:
"""Return True if system is in error."""
return self._error_state
@property
def ip_address(self) -> IPv4Address:
"""Return IP of Home Assistant instance."""
return self.instance.ip_address
@property
def api_port(self) -> int:
"""Return network port to Home Assistant instance."""
return self._data[ATTR_PORT]
@api_port.setter
def api_port(self, value: int) -> None:
"""Set network port for Home Assistant instance."""
self._data[ATTR_PORT] = value
@property
def api_password(self) -> str:
"""Return password for Home Assistant instance."""
return self._data.get(ATTR_PASSWORD)
@api_password.setter
def api_password(self, value: str):
"""Set password for Home Assistant instance."""
self._data[ATTR_PASSWORD] = value
@property
def api_ssl(self) -> bool:
"""Return if we need ssl to Home Assistant instance."""
return self._data[ATTR_SSL]
@api_ssl.setter
def api_ssl(self, value: bool):
"""Set SSL for Home Assistant instance."""
self._data[ATTR_SSL] = value
@property
def api_url(self) -> str:
"""Return API url to Home Assistant."""
return "{}://{}:{}".format('https' if self.api_ssl else 'http',
self.ip_address, self.api_port)
@property
def watchdog(self) -> bool:
"""Return True if the watchdog should protect Home Assistant."""
return self._data[ATTR_WATCHDOG]
@watchdog.setter
def watchdog(self, value: bool):
"""Return True if the watchdog should protect Home Assistant."""
self._data[ATTR_WATCHDOG] = value
@property
def wait_boot(self) -> int:
"""Return time to wait for Home Assistant startup."""
return self._data[ATTR_WAIT_BOOT]
@wait_boot.setter
def wait_boot(self, value: int):
"""Set time to wait for Home Assistant startup."""
self._data[ATTR_WAIT_BOOT] = value
@property
def version(self) -> str:
"""Return version of running Home Assistant."""
return self.instance.version
@property
def last_version(self) -> str:
"""Return last available version of Home Assistant."""
if self.is_custom_image:
return self._data.get(ATTR_LAST_VERSION)
return self.sys_updater.version_homeassistant
@last_version.setter
def last_version(self, value: str):
"""Set last available version of Home Assistant."""
if value:
self._data[ATTR_LAST_VERSION] = value
else:
self._data.pop(ATTR_LAST_VERSION, None)
@property
def image(self) -> str:
"""Return image name of the Home Assistant container."""
if self._data.get(ATTR_IMAGE):
return self._data[ATTR_IMAGE]
return os.environ['HOMEASSISTANT_REPOSITORY']
@image.setter
def image(self, value: str):
"""Set image name of Home Assistant container."""
if value:
self._data[ATTR_IMAGE] = value
else:
self._data.pop(ATTR_IMAGE, None)
@property
def is_custom_image(self) -> bool:
"""Return True if a custom image is used."""
return all(
attr in self._data for attr in (ATTR_IMAGE, ATTR_LAST_VERSION))
@property
def boot(self) -> bool:
"""Return True if Home Assistant boot is enabled."""
return self._data[ATTR_BOOT]
@boot.setter
def boot(self, value: bool):
"""Set Home Assistant boot options."""
self._data[ATTR_BOOT] = value
@property
def uuid(self) -> UUID:
"""Return a UUID of this Home Assistant instance."""
return self._data[ATTR_UUID]
@property
def hassio_token(self) -> str:
"""Return an access token for the Hass.io API."""
return self._data.get(ATTR_ACCESS_TOKEN)
@property
def refresh_token(self) -> str:
"""Return the refresh token to authenticate with Home Assistant."""
return self._data.get(ATTR_REFRESH_TOKEN)
@refresh_token.setter
def refresh_token(self, value: str):
"""Set Home Assistant refresh_token."""
self._data[ATTR_REFRESH_TOKEN] = value
@process_lock
async def install_landingpage(self) -> None:
"""Install a landing page."""
_LOGGER.info("Setup HomeAssistant landingpage")
while True:
with suppress(DockerAPIError):
await self.instance.install('landingpage')
return
_LOGGER.warning("Fails install landingpage, retry after 30sec")
await asyncio.sleep(30)
@process_lock
async def install(self) -> None:
"""Install a landing page."""
_LOGGER.info("Setup Home Assistant")
while True:
# read homeassistant tag and install it
if not self.last_version:
await self.sys_updater.reload()
tag = self.last_version
if tag:
with suppress(DockerAPIError):
await self.instance.install(tag)
break
_LOGGER.warning("Error on install Home Assistant. Retry in 30sec")
await asyncio.sleep(30)
# finishing
_LOGGER.info("Home Assistant docker now installed")
try:
if not self.boot:
return
_LOGGER.info("Start Home Assistant")
await self._start()
except HomeAssistantError:
_LOGGER.error("Can't start Home Assistant!")
finally:
with suppress(DockerAPIError):
await self.instance.cleanup()
@process_lock
async def update(self, version=None) -> None:
"""Update HomeAssistant version."""
version = version or self.last_version
rollback = self.version if not self.error_state else None
running = await self.instance.is_running()
exists = await self.instance.exists()
if exists and version == self.instance.version:
_LOGGER.warning("Version %s is already installed", version)
return
# process an update
async def _update(to_version):
"""Run Home Assistant update."""
_LOGGER.info("Update Home Assistant to version %s", to_version)
try:
await self.instance.update(to_version)
except DockerAPIError:
_LOGGER.warning("Update Home Assistant image fails")
raise HomeAssistantUpdateError() from None
if running:
await self._start()
_LOGGER.info("Successful run Home Assistant %s", to_version)
# Update Home Assistant
with suppress(HomeAssistantError):
await _update(version)
return
# Update going wrong, revert it
if self.error_state and rollback:
_LOGGER.fatal("HomeAssistant update fails -> rollback!")
await _update(rollback)
else:
raise HomeAssistantUpdateError()
async def _start(self) -> None:
"""Start Home Assistant Docker & wait."""
if await self.instance.is_running():
_LOGGER.warning("Home Assistant is already running!")
return
# Create new API token
self._data[ATTR_ACCESS_TOKEN] = secrets.token_hex(56)
self.save_data()
try:
await self.instance.run()
except DockerAPIError:
raise HomeAssistantError() from None
await self._block_till_run()
@process_lock
async def start(self) -> None:
"""Run Home Assistant docker."""
try:
if await self.instance.is_running():
await self.instance.restart()
elif await self.instance.is_initialize():
await self.instance.start()
else:
await self._start()
return
await self._block_till_run()
except DockerAPIError:
raise HomeAssistantError() from None
@process_lock
async def stop(self) -> None:
"""Stop Home Assistant Docker.
Return a coroutine.
"""
try:
return await self.instance.stop(remove_container=False)
except DockerAPIError:
raise HomeAssistantError() from None
@process_lock
async def restart(self) -> None:
"""Restart Home Assistant Docker."""
try:
await self.instance.restart()
except DockerAPIError:
raise HomeAssistantError() from None
await self._block_till_run()
@process_lock
async def rebuild(self) -> None:
"""Rebuild Home Assistant Docker container."""
with suppress(DockerAPIError):
await self.instance.stop()
await self._start()
def logs(self) -> Awaitable[bytes]:
"""Get HomeAssistant docker logs.
Return a coroutine.
"""
return self.instance.logs()
async def stats(self) -> DockerStats:
"""Return stats of Home Assistant.
Return a coroutine.
"""
try:
return await self.instance.stats()
except DockerAPIError:
raise HomeAssistantError() from None
def is_running(self) -> Awaitable[bool]:
"""Return True if Docker container is running.
Return a coroutine.
"""
return self.instance.is_running()
def is_fails(self) -> Awaitable[bool]:
"""Return True if a Docker container is fails state.
Return a coroutine.
"""
return self.instance.is_fails()
@property
def in_progress(self) -> bool:
"""Return True if a task is in progress."""
return self.instance.in_progress or self.lock.locked()
async def check_config(self) -> ConfigResult:
"""Run Home Assistant config check."""
result = await self.instance.execute_command(
"python3 -m homeassistant -c /config --script check_config")
# if not valid
if result.exit_code is None:
_LOGGER.error("Fatal error on config check!")
raise HomeAssistantError()
# parse output
log = convert_to_ascii(result.output)
if result.exit_code != 0 or RE_YAML_ERROR.search(log):
_LOGGER.error("Invalid Home Assistant config found!")
return ConfigResult(False, log)
_LOGGER.info("Home Assistant config is valid")
return ConfigResult(True, log)
async def ensure_access_token(self) -> None:
"""Ensures there is an access token."""
if self.access_token is not None and self._access_token_expires > datetime.utcnow():
return
with suppress(asyncio.TimeoutError, aiohttp.ClientError):
async with self.sys_websession_ssl.post(
f"{self.api_url}/auth/token",
timeout=30,
data={
"grant_type": "refresh_token",
"refresh_token": self.refresh_token
}) as resp:
if resp.status != 200:
_LOGGER.error("Can't update Home Assistant access token!")
raise HomeAssistantAuthError()
_LOGGER.info("Updated Home Assistant API token")
tokens = await resp.json()
self.access_token = tokens['access_token']
self._access_token_expires = \
datetime.utcnow() + timedelta(seconds=tokens['expires_in'])
@asynccontextmanager
async def make_request(self,
method: str,
path: str,
json: Optional[Dict[str, Any]] = None,
content_type: Optional[str] = None,
data: Optional[bytes] = None,
timeout=30) -> AsyncContextManager[aiohttp.ClientResponse]:
"""Async context manager to make a request with right auth."""
url = f"{self.api_url}/{path}"
headers = {}
# Passthrough content type
if content_type is not None:
headers[hdrs.CONTENT_TYPE] = content_type
# Set old API Password
if self.api_password:
headers[HEADER_HA_ACCESS] = self.api_password
for _ in (1, 2):
# Prepare Access token
if self.refresh_token:
await self.ensure_access_token()
headers[hdrs.AUTHORIZATION] = f'Bearer {self.access_token}'
try:
async with getattr(self.sys_websession_ssl, method)(
url, data=data, timeout=timeout, json=json,
headers=headers) as resp:
# Access token expired
if resp.status == 401 and self.refresh_token:
self.access_token = None
continue
yield resp
return
except (asyncio.TimeoutError, aiohttp.ClientError) as err:
_LOGGER.error("Error on call %s: %s", url, err)
break
raise HomeAssistantAPIError()
async def check_api_state(self) -> bool:
"""Return True if Home Assistant up and running."""
with suppress(HomeAssistantAPIError):
async with self.make_request('get', 'api/') as resp:
if resp.status in (200, 201):
return True
status = resp.status
_LOGGER.warning("Home Assistant API config mismatch: %s", status)
return False
async def _block_till_run(self) -> None:
"""Block until Home-Assistant is booting up or startup timeout."""
start_time = time.monotonic()
migration_progress = False
migration_file = Path(self.sys_config.path_homeassistant,
'.migration_progress')
def check_port():
"""Check if port is mapped."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
result = sock.connect_ex((str(self.ip_address), self.api_port))
sock.close()
# Check if the port is available
if result == 0:
return True
except OSError:
pass
return False
while True:
await asyncio.sleep(5)
# 1: Check if Container is is_running
if not await self.instance.is_running():
_LOGGER.error("Home Assistant has crashed!")
break
# 2: Check if API response
if await self.sys_run_in_executor(check_port):
_LOGGER.info("Detect a running Home Assistant instance")
self._error_state = False
return
# 3: Running DB Migration
if migration_file.exists():
if not migration_progress:
migration_progress = True
_LOGGER.info("Home Assistant record migration in progress")
continue
elif migration_progress:
migration_progress = False # Reset start time
start_time = time.monotonic()
_LOGGER.info("Home Assistant record migration done")
# 4: Timeout
if time.monotonic() - start_time > self.wait_boot:
_LOGGER.warning("Don't wait anymore of Home Assistant startup!")
break
self._error_state = True
raise HomeAssistantError()