"""Home Assistant control object.""" import asyncio from contextlib import asynccontextmanager, suppress from datetime import datetime, timedelta from ipaddress import IPv4Address import logging import os from pathlib import Path import re import secrets import socket import time from typing import Any, AsyncContextManager, Awaitable, Dict, Optional from uuid import UUID import aiohttp from aiohttp import hdrs import attr from .const import ( ATTR_ACCESS_TOKEN, ATTR_BOOT, ATTR_IMAGE, ATTR_LAST_VERSION, ATTR_PASSWORD, ATTR_PORT, ATTR_REFRESH_TOKEN, ATTR_SSL, ATTR_UUID, ATTR_WAIT_BOOT, ATTR_WATCHDOG, FILE_HASSIO_HOMEASSISTANT, HEADER_HA_ACCESS, ) from .coresys import CoreSys, CoreSysAttributes from .docker.homeassistant import DockerHomeAssistant from .docker.stats import DockerStats from .exceptions import ( DockerAPIError, HomeAssistantAPIError, HomeAssistantAuthError, HomeAssistantError, HomeAssistantUpdateError, ) from .utils import convert_to_ascii, process_lock from .utils.json import JsonConfig from .validate import SCHEMA_HASS_CONFIG _LOGGER = logging.getLogger(__name__) RE_YAML_ERROR = re.compile(r"homeassistant\.util\.yaml") @attr.s(frozen=True) class ConfigResult: """Return object from config check.""" valid = attr.ib() log = attr.ib() class HomeAssistant(JsonConfig, CoreSysAttributes): """Home Assistant core object for handle it.""" def __init__(self, coresys: CoreSys): """Initialize Home Assistant object.""" super().__init__(FILE_HASSIO_HOMEASSISTANT, SCHEMA_HASS_CONFIG) self.coresys: CoreSys = coresys self.instance: DockerHomeAssistant = DockerHomeAssistant(coresys) self.lock: asyncio.Lock = asyncio.Lock(loop=coresys.loop) self._error_state: bool = False # We don't persist access tokens. Instead we fetch new ones when needed self.access_token: Optional[str] = None self._access_token_expires: Optional[datetime] = None async def load(self) -> None: """Prepare Home Assistant object.""" with suppress(DockerAPIError): await self.instance.attach() return _LOGGER.info("No Home Assistant Docker image %s found.", self.image) await self.install_landingpage() @property def machine(self) -> str: """Return the system machines.""" return self.instance.machine @property def arch(self) -> str: """Return arch of running Home Assistant.""" return self.instance.arch @property def error_state(self) -> bool: """Return True if system is in error.""" return self._error_state @property def ip_address(self) -> IPv4Address: """Return IP of Home Assistant instance.""" return self.instance.ip_address @property def api_port(self) -> int: """Return network port to Home Assistant instance.""" return self._data[ATTR_PORT] @api_port.setter def api_port(self, value: int) -> None: """Set network port for Home Assistant instance.""" self._data[ATTR_PORT] = value @property def api_password(self) -> str: """Return password for Home Assistant instance.""" return self._data.get(ATTR_PASSWORD) @api_password.setter def api_password(self, value: str): """Set password for Home Assistant instance.""" self._data[ATTR_PASSWORD] = value @property def api_ssl(self) -> bool: """Return if we need ssl to Home Assistant instance.""" return self._data[ATTR_SSL] @api_ssl.setter def api_ssl(self, value: bool): """Set SSL for Home Assistant instance.""" self._data[ATTR_SSL] = value @property def api_url(self) -> str: """Return API url to Home Assistant.""" return "{}://{}:{}".format('https' if self.api_ssl else 'http', self.ip_address, self.api_port) @property def watchdog(self) -> bool: """Return True if the watchdog should protect Home Assistant.""" return self._data[ATTR_WATCHDOG] @watchdog.setter def watchdog(self, value: bool): """Return True if the watchdog should protect Home Assistant.""" self._data[ATTR_WATCHDOG] = value @property def wait_boot(self) -> int: """Return time to wait for Home Assistant startup.""" return self._data[ATTR_WAIT_BOOT] @wait_boot.setter def wait_boot(self, value: int): """Set time to wait for Home Assistant startup.""" self._data[ATTR_WAIT_BOOT] = value @property def version(self) -> str: """Return version of running Home Assistant.""" return self.instance.version @property def latest_version(self) -> str: """Return last available version of Home Assistant.""" if self.is_custom_image: return self._data.get(ATTR_LAST_VERSION) return self.sys_updater.version_homeassistant @latest_version.setter def latest_version(self, value: str): """Set last available version of Home Assistant.""" if value: self._data[ATTR_LAST_VERSION] = value else: self._data.pop(ATTR_LAST_VERSION, None) @property def image(self) -> str: """Return image name of the Home Assistant container.""" if self._data.get(ATTR_IMAGE): return self._data[ATTR_IMAGE] return os.environ['HOMEASSISTANT_REPOSITORY'] @image.setter def image(self, value: str): """Set image name of Home Assistant container.""" if value: self._data[ATTR_IMAGE] = value else: self._data.pop(ATTR_IMAGE, None) @property def is_custom_image(self) -> bool: """Return True if a custom image is used.""" return all( attr in self._data for attr in (ATTR_IMAGE, ATTR_LAST_VERSION)) @property def boot(self) -> bool: """Return True if Home Assistant boot is enabled.""" return self._data[ATTR_BOOT] @boot.setter def boot(self, value: bool): """Set Home Assistant boot options.""" self._data[ATTR_BOOT] = value @property def uuid(self) -> UUID: """Return a UUID of this Home Assistant instance.""" return self._data[ATTR_UUID] @property def hassio_token(self) -> str: """Return an access token for the Hass.io API.""" return self._data.get(ATTR_ACCESS_TOKEN) @property def refresh_token(self) -> str: """Return the refresh token to authenticate with Home Assistant.""" return self._data.get(ATTR_REFRESH_TOKEN) @refresh_token.setter def refresh_token(self, value: str): """Set Home Assistant refresh_token.""" self._data[ATTR_REFRESH_TOKEN] = value @process_lock async def install_landingpage(self) -> None: """Install a landing page.""" _LOGGER.info("Setup HomeAssistant landingpage") while True: with suppress(DockerAPIError): await self.instance.install('landingpage') return _LOGGER.warning("Fails install landingpage, retry after 30sec") await asyncio.sleep(30) @process_lock async def install(self) -> None: """Install a landing page.""" _LOGGER.info("Setup Home Assistant") while True: # read homeassistant tag and install it if not self.latest_version: await self.sys_updater.reload() tag = self.latest_version if tag: with suppress(DockerAPIError): await self.instance.install(tag) break _LOGGER.warning("Error on install Home Assistant. Retry in 30sec") await asyncio.sleep(30) # finishing _LOGGER.info("Home Assistant docker now installed") try: if not self.boot: return _LOGGER.info("Start Home Assistant") await self._start() except HomeAssistantError: _LOGGER.error("Can't start Home Assistant!") finally: with suppress(DockerAPIError): await self.instance.cleanup() @process_lock async def update(self, version=None) -> None: """Update HomeAssistant version.""" version = version or self.latest_version rollback = self.version if not self.error_state else None running = await self.instance.is_running() exists = await self.instance.exists() if exists and version == self.instance.version: _LOGGER.warning("Version %s is already installed", version) return # process an update async def _update(to_version): """Run Home Assistant update.""" _LOGGER.info("Update Home Assistant to version %s", to_version) try: await self.instance.update(to_version) except DockerAPIError: _LOGGER.warning("Update Home Assistant image fails") raise HomeAssistantUpdateError() from None if running: await self._start() _LOGGER.info("Successful run Home Assistant %s", to_version) # Update Home Assistant with suppress(HomeAssistantError): await _update(version) return # Update going wrong, revert it if self.error_state and rollback: _LOGGER.fatal("HomeAssistant update fails -> rollback!") await _update(rollback) else: raise HomeAssistantUpdateError() async def _start(self) -> None: """Start Home Assistant Docker & wait.""" if await self.instance.is_running(): _LOGGER.warning("Home Assistant is already running!") return # Create new API token self._data[ATTR_ACCESS_TOKEN] = secrets.token_hex(56) self.save_data() try: await self.instance.run() except DockerAPIError: raise HomeAssistantError() from None await self._block_till_run() @process_lock async def start(self) -> None: """Run Home Assistant docker.""" try: if await self.instance.is_running(): await self.instance.restart() elif await self.instance.is_initialize(): await self.instance.start() else: await self._start() return await self._block_till_run() except DockerAPIError: raise HomeAssistantError() from None @process_lock async def stop(self) -> None: """Stop Home Assistant Docker. Return a coroutine. """ try: return await self.instance.stop(remove_container=False) except DockerAPIError: raise HomeAssistantError() from None @process_lock async def restart(self) -> None: """Restart Home Assistant Docker.""" try: await self.instance.restart() except DockerAPIError: raise HomeAssistantError() from None await self._block_till_run() @process_lock async def rebuild(self) -> None: """Rebuild Home Assistant Docker container.""" with suppress(DockerAPIError): await self.instance.stop() await self._start() def logs(self) -> Awaitable[bytes]: """Get HomeAssistant docker logs. Return a coroutine. """ return self.instance.logs() async def stats(self) -> DockerStats: """Return stats of Home Assistant. Return a coroutine. """ try: return await self.instance.stats() except DockerAPIError: raise HomeAssistantError() from None def is_running(self) -> Awaitable[bool]: """Return True if Docker container is running. Return a coroutine. """ return self.instance.is_running() def is_fails(self) -> Awaitable[bool]: """Return True if a Docker container is fails state. Return a coroutine. """ return self.instance.is_fails() @property def in_progress(self) -> bool: """Return True if a task is in progress.""" return self.instance.in_progress or self.lock.locked() async def check_config(self) -> ConfigResult: """Run Home Assistant config check.""" result = await self.instance.execute_command( "python3 -m homeassistant -c /config --script check_config") # if not valid if result.exit_code is None: _LOGGER.error("Fatal error on config check!") raise HomeAssistantError() # parse output log = convert_to_ascii(result.output) if result.exit_code != 0 or RE_YAML_ERROR.search(log): _LOGGER.error("Invalid Home Assistant config found!") return ConfigResult(False, log) _LOGGER.info("Home Assistant config is valid") return ConfigResult(True, log) async def ensure_access_token(self) -> None: """Ensures there is an access token.""" if self.access_token is not None and self._access_token_expires > datetime.utcnow(): return with suppress(asyncio.TimeoutError, aiohttp.ClientError): async with self.sys_websession_ssl.post( f"{self.api_url}/auth/token", timeout=30, data={ "grant_type": "refresh_token", "refresh_token": self.refresh_token }) as resp: if resp.status != 200: _LOGGER.error("Can't update Home Assistant access token!") raise HomeAssistantAuthError() _LOGGER.info("Updated Home Assistant API token") tokens = await resp.json() self.access_token = tokens['access_token'] self._access_token_expires = \ datetime.utcnow() + timedelta(seconds=tokens['expires_in']) @asynccontextmanager async def make_request(self, method: str, path: str, json: Optional[Dict[str, Any]] = None, content_type: Optional[str] = None, data: Optional[bytes] = None, timeout=30) -> AsyncContextManager[aiohttp.ClientResponse]: """Async context manager to make a request with right auth.""" url = f"{self.api_url}/{path}" headers = {} # Passthrough content type if content_type is not None: headers[hdrs.CONTENT_TYPE] = content_type # Set old API Password if self.api_password: headers[HEADER_HA_ACCESS] = self.api_password for _ in (1, 2): # Prepare Access token if self.refresh_token: await self.ensure_access_token() headers[hdrs.AUTHORIZATION] = f'Bearer {self.access_token}' try: async with getattr(self.sys_websession_ssl, method)( url, data=data, timeout=timeout, json=json, headers=headers) as resp: # Access token expired if resp.status == 401 and self.refresh_token: self.access_token = None continue yield resp return except (asyncio.TimeoutError, aiohttp.ClientError) as err: _LOGGER.error("Error on call %s: %s", url, err) break raise HomeAssistantAPIError() async def check_api_state(self) -> bool: """Return True if Home Assistant up and running.""" with suppress(HomeAssistantAPIError): async with self.make_request('get', 'api/') as resp: if resp.status in (200, 201): return True status = resp.status _LOGGER.warning("Home Assistant API config mismatch: %s", status) return False async def _block_till_run(self) -> None: """Block until Home-Assistant is booting up or startup timeout.""" start_time = time.monotonic() migration_progress = False migration_file = Path(self.sys_config.path_homeassistant, '.migration_progress') def check_port(): """Check if port is mapped.""" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: result = sock.connect_ex((str(self.ip_address), self.api_port)) sock.close() # Check if the port is available if result == 0: return True except OSError: pass return False while True: await asyncio.sleep(5) # 1: Check if Container is is_running if not await self.instance.is_running(): _LOGGER.error("Home Assistant has crashed!") break # 2: Check if API response if await self.sys_run_in_executor(check_port): _LOGGER.info("Detect a running Home Assistant instance") self._error_state = False return # 3: Running DB Migration if migration_file.exists(): if not migration_progress: migration_progress = True _LOGGER.info("Home Assistant record migration in progress") continue elif migration_progress: migration_progress = False # Reset start time start_time = time.monotonic() _LOGGER.info("Home Assistant record migration done") # 4: Timeout if time.monotonic() - start_time > self.wait_boot: _LOGGER.warning("Don't wait anymore of Home Assistant startup!") break self._error_state = True raise HomeAssistantError()