Fix HAOS sync output (#2755)

* Fix HAOS sync output

* revert api change

* As usaly

* Simplify code

* Adjust error handling
This commit is contained in:
Pascal Vizeli 2021-03-26 14:33:14 +01:00 committed by GitHub
parent 58c40cbef6
commit 9194088947
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 96 additions and 28 deletions

View File

@ -162,8 +162,7 @@ class Core(CoreSysAttributes):
await self.sys_supervisor.check_connectivity() await self.sys_supervisor.check_connectivity()
# Mark booted partition as healthy # Mark booted partition as healthy
if self.sys_hassos.available: await self.sys_hassos.mark_healthy()
await self.sys_hassos.mark_healthy()
# On release channel, try update itself # On release channel, try update itself
if self.sys_supervisor.need_update: if self.sys_supervisor.need_update:

View File

@ -95,7 +95,7 @@ class HassOSUpdateError(HassOSError):
"""Error on update of a HassOS.""" """Error on update of a HassOS."""
class HassOSNotSupportedError(HassioNotSupportedError): class HassOSJobError(HassOSError, JobException):
"""Function not supported by HassOS.""" """Function not supported by HassOS."""

View File

@ -10,8 +10,9 @@ from cpe import CPE
from .coresys import CoreSys, CoreSysAttributes from .coresys import CoreSys, CoreSysAttributes
from .dbus.rauc import RaucState from .dbus.rauc import RaucState
from .exceptions import DBusError, HassOSNotSupportedError, HassOSUpdateError from .exceptions import DBusError, HassOSJobError, HassOSUpdateError
from .utils import process_lock from .jobs.const import JobCondition, JobExecutionLimit
from .jobs.decorator import Job
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
@ -22,7 +23,6 @@ class HassOS(CoreSysAttributes):
def __init__(self, coresys: CoreSys): def __init__(self, coresys: CoreSys):
"""Initialize HassOS handler.""" """Initialize HassOS handler."""
self.coresys: CoreSys = coresys self.coresys: CoreSys = coresys
self.lock: asyncio.Lock = asyncio.Lock()
self._available: bool = False self._available: bool = False
self._version: Optional[AwesomeVersion] = None self._version: Optional[AwesomeVersion] = None
self._board: Optional[str] = None self._board: Optional[str] = None
@ -55,18 +55,11 @@ class HassOS(CoreSysAttributes):
"""Return board name.""" """Return board name."""
return self._board return self._board
def _check_host(self) -> None:
"""Check if HassOS is available."""
if not self.available:
_LOGGER.error("No Home Assistant Operating System available")
raise HassOSNotSupportedError()
async def _download_raucb(self, version: AwesomeVersion) -> Path: async def _download_raucb(self, version: AwesomeVersion) -> Path:
"""Download rauc bundle (OTA) from github.""" """Download rauc bundle (OTA) from github."""
raw_url = self.sys_updater.ota_url raw_url = self.sys_updater.ota_url
if raw_url is None: if raw_url is None:
_LOGGER.error("Don't have an URL for OTA updates!") raise HassOSUpdateError("Don't have an URL for OTA updates!", _LOGGER.error)
raise HassOSNotSupportedError()
url = raw_url.format(version=str(version), board=self.board) url = raw_url.format(version=str(version), board=self.board)
_LOGGER.info("Fetch OTA update from %s", url) _LOGGER.info("Fetch OTA update from %s", url)
@ -75,7 +68,10 @@ class HassOS(CoreSysAttributes):
timeout = aiohttp.ClientTimeout(total=60 * 60, connect=180) timeout = aiohttp.ClientTimeout(total=60 * 60, connect=180)
async with self.sys_websession.get(url, timeout=timeout) as request: async with self.sys_websession.get(url, timeout=timeout) as request:
if request.status != 200: if request.status != 200:
raise HassOSUpdateError() raise HassOSUpdateError(
f"Error raise form OTA Webserver: {request.status}",
_LOGGER.error,
)
# Download RAUCB file # Download RAUCB file
with raucb.open("wb") as ota_file: with raucb.open("wb") as ota_file:
@ -90,12 +86,14 @@ class HassOS(CoreSysAttributes):
except (aiohttp.ClientError, asyncio.TimeoutError) as err: except (aiohttp.ClientError, asyncio.TimeoutError) as err:
self.sys_supervisor.connectivity = False self.sys_supervisor.connectivity = False
_LOGGER.warning("Can't fetch OTA update from %s: %s", url, err) raise HassOSUpdateError(
f"Can't fetch OTA update from {url}: {err!s}", _LOGGER.error
) from err
except OSError as err: except OSError as err:
_LOGGER.error("Can't write OTA file: %s", err) raise HassOSUpdateError(
f"Can't write OTA file: {err!s}", _LOGGER.error
raise HassOSUpdateError() ) from err
async def load(self) -> None: async def load(self) -> None:
"""Load HassOS data.""" """Load HassOS data."""
@ -126,25 +124,30 @@ class HassOS(CoreSysAttributes):
self.sys_dbus.rauc.boot_slot, self.sys_dbus.rauc.boot_slot,
) )
def config_sync(self) -> Awaitable[None]: @Job(
conditions=[JobCondition.HAOS],
on_condition=HassOSJobError,
)
async def config_sync(self) -> Awaitable[None]:
"""Trigger a host config reload from usb. """Trigger a host config reload from usb.
Return a coroutine. Return a coroutine.
""" """
self._check_host()
_LOGGER.info( _LOGGER.info(
"Synchronizing configuration from USB with Home Assistant Operating System." "Synchronizing configuration from USB with Home Assistant Operating System."
) )
return self.sys_host.services.restart("hassos-config.service") await self.sys_host.services.restart("hassos-config.service")
@process_lock @Job(
conditions=[JobCondition.HAOS, JobCondition.INTERNET_SYSTEM],
limit=JobExecutionLimit.ONCE,
on_condition=HassOSJobError,
)
async def update(self, version: Optional[AwesomeVersion] = None) -> None: async def update(self, version: Optional[AwesomeVersion] = None) -> None:
"""Update HassOS system.""" """Update HassOS system."""
version = version or self.latest_version version = version or self.latest_version
# Check installed version # Check installed version
self._check_host()
if version == self.version: if version == self.version:
raise HassOSUpdateError( raise HassOSUpdateError(
f"Version {version!s} is already installed", _LOGGER.warning f"Version {version!s} is already installed", _LOGGER.warning
@ -159,8 +162,7 @@ class HassOS(CoreSysAttributes):
completed = await self.sys_dbus.rauc.signal_completed() completed = await self.sys_dbus.rauc.signal_completed()
except DBusError as err: except DBusError as err:
_LOGGER.error("Rauc communication error") raise HassOSUpdateError("Rauc communication error", _LOGGER.error) from err
raise HassOSUpdateError() from err
finally: finally:
int_ota.unlink() int_ota.unlink()
@ -181,6 +183,7 @@ class HassOS(CoreSysAttributes):
) )
raise HassOSUpdateError() raise HassOSUpdateError()
@Job(conditions=[JobCondition.HAOS])
async def mark_healthy(self) -> None: async def mark_healthy(self) -> None:
"""Set booted partition as good for rauc.""" """Set booted partition as good for rauc."""
try: try:

View File

@ -17,11 +17,13 @@ class JobCondition(str, Enum):
INTERNET_SYSTEM = "internet_system" INTERNET_SYSTEM = "internet_system"
INTERNET_HOST = "internet_host" INTERNET_HOST = "internet_host"
RUNNING = "running" RUNNING = "running"
HAOS = "haos"
class JobExecutionLimit(str, Enum): class JobExecutionLimit(str, Enum):
"""Job Execution limits.""" """Job Execution limits."""
SINGLE_WAIT = "single_wait" SINGLE_WAIT = "single_wait"
ONCE = "once"
THROTTLE = "throttle" THROTTLE = "throttle"
THROTTLE_WAIT = "throttle_wait" THROTTLE_WAIT = "throttle_wait"

View File

@ -81,7 +81,7 @@ class Job(CoreSysAttributes):
raise self.on_condition() raise self.on_condition()
# Handle exection limits # Handle exection limits
if self.limit == JobExecutionLimit.SINGLE_WAIT: if self.limit in (JobExecutionLimit.SINGLE_WAIT, JobExecutionLimit.ONCE):
await self._acquire_exection_limit() await self._acquire_exection_limit()
elif self.limit == JobExecutionLimit.THROTTLE: elif self.limit == JobExecutionLimit.THROTTLE:
time_since_last_call = datetime.now() - self._last_call time_since_last_call = datetime.now() - self._last_call
@ -176,21 +176,34 @@ class Job(CoreSysAttributes):
) )
return False return False
if JobCondition.HAOS in self.conditions and not self.sys_hassos.available:
_LOGGER.warning(
"'%s' blocked from execution, no Home Assistant OS available",
self._method.__qualname__,
)
return False
return True return True
async def _acquire_exection_limit(self) -> None: async def _acquire_exection_limit(self) -> None:
"""Process exection limits.""" """Process exection limits."""
if self.limit not in ( if self.limit not in (
JobExecutionLimit.SINGLE_WAIT, JobExecutionLimit.SINGLE_WAIT,
JobExecutionLimit.ONCE,
JobExecutionLimit.THROTTLE_WAIT, JobExecutionLimit.THROTTLE_WAIT,
): ):
return return
if self.limit == JobExecutionLimit.ONCE and self._lock.locked():
raise self.on_condition("Another job is running")
await self._lock.acquire() await self._lock.acquire()
def _release_exception_limits(self) -> None: def _release_exception_limits(self) -> None:
"""Release possible exception limits.""" """Release possible exception limits."""
if self.limit not in ( if self.limit not in (
JobExecutionLimit.SINGLE_WAIT, JobExecutionLimit.SINGLE_WAIT,
JobExecutionLimit.ONCE,
JobExecutionLimit.THROTTLE_WAIT, JobExecutionLimit.THROTTLE_WAIT,
): ):
return return

View File

@ -103,6 +103,29 @@ async def test_free_space(coresys: CoreSys):
assert not await test.execute() assert not await test.execute()
async def test_haos(coresys: CoreSys):
"""Test the haos decorator."""
class TestClass:
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.HAOS])
async def execute(self):
"""Execute the class method."""
return True
test = TestClass(coresys)
coresys.hassos._available = True
assert await test.execute()
coresys.hassos._available = False
assert not await test.execute()
async def test_internet_connectivity_with_core_state(coresys: CoreSys): async def test_internet_connectivity_with_core_state(coresys: CoreSys):
"""Test the different core states and the impact for internet condition.""" """Test the different core states and the impact for internet condition."""
@ -345,3 +368,31 @@ async def test_exectution_limit_throttle(coresys: CoreSys, loop: asyncio.BaseEve
await asyncio.gather(*[test.execute(0.1)]) await asyncio.gather(*[test.execute(0.1)])
assert test.call == 1 assert test.call == 1
async def test_exectution_limit_once(coresys: CoreSys, loop: asyncio.BaseEventLoop):
"""Test the ignore conditions decorator."""
class TestClass:
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
self.run = asyncio.Lock()
@Job(limit=JobExecutionLimit.ONCE, on_condition=JobException)
async def execute(self, sleep: float):
"""Execute the class method."""
assert not self.run.locked()
async with self.run:
await asyncio.sleep(sleep)
test = TestClass(coresys)
run_task = loop.create_task(test.execute(0.3))
await asyncio.sleep(0.1)
with pytest.raises(JobException):
await test.execute(0.1)
await run_task