mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-07-15 21:26:29 +00:00
Add CodeNotary / for version file (#2731)
* Add CodeNotary / for version file * Apply suggestions from code review Co-authored-by: Joakim Sørensen <joasoe@gmail.com> * Address comment Co-authored-by: Joakim Sørensen <joasoe@gmail.com>
This commit is contained in:
parent
667672a20b
commit
f93f5d0e71
@ -3,7 +3,8 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Optional, TypeVar
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Coroutine, Optional, TypeVar
|
||||
|
||||
import aiohttp
|
||||
import sentry_sdk
|
||||
@ -11,6 +12,7 @@ import sentry_sdk
|
||||
from .config import CoreConfig
|
||||
from .const import ENV_SUPERVISOR_DEV
|
||||
from .docker import DockerAPI
|
||||
from .utils.codenotary import vcn_validate
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .addons import AddonManager
|
||||
@ -610,3 +612,9 @@ class CoreSysAttributes:
|
||||
def sys_capture_exception(self, err: Exception) -> None:
|
||||
"""Capture a exception."""
|
||||
sentry_sdk.capture_exception(err)
|
||||
|
||||
def sys_verify_content(
|
||||
self, checksum: Optional[str] = None, path: Optional[Path] = None
|
||||
) -> Awaitable[bool]:
|
||||
"""Verify content from HA org."""
|
||||
return vcn_validate(checksum, path, org="home-assistant.io")
|
||||
|
@ -320,6 +320,17 @@ class PwnedConnectivityError(PwnedError):
|
||||
"""Connectivity errors while checking pwned passwords."""
|
||||
|
||||
|
||||
# util/codenotary
|
||||
|
||||
|
||||
class CodeNotaryError(HassioError):
|
||||
"""Error general with CodeNotary."""
|
||||
|
||||
|
||||
class CodeNotaryUntrusted(CodeNotaryError):
|
||||
"""Error on untrusted content."""
|
||||
|
||||
|
||||
# docker/api
|
||||
|
||||
|
||||
|
@ -28,8 +28,14 @@ from .const import (
|
||||
UpdateChannel,
|
||||
)
|
||||
from .coresys import CoreSysAttributes
|
||||
from .exceptions import UpdaterError, UpdaterJobError
|
||||
from .exceptions import (
|
||||
CodeNotaryError,
|
||||
CodeNotaryUntrusted,
|
||||
UpdaterError,
|
||||
UpdaterJobError,
|
||||
)
|
||||
from .jobs.decorator import Job, JobCondition
|
||||
from .utils.codenotary import calc_checksum
|
||||
from .utils.common import FileConfiguration
|
||||
from .validate import SCHEMA_UPDATER_CONFIG
|
||||
|
||||
@ -180,24 +186,37 @@ class Updater(FileConfiguration, CoreSysAttributes):
|
||||
url = URL_HASSIO_VERSION.format(channel=self.channel)
|
||||
machine = self.sys_machine or "default"
|
||||
|
||||
# Get data
|
||||
try:
|
||||
_LOGGER.info("Fetching update data from %s", url)
|
||||
async with self.sys_websession.get(url, timeout=10) as request:
|
||||
data = await request.json(content_type=None)
|
||||
data = await request.read()
|
||||
|
||||
except (aiohttp.ClientError, asyncio.TimeoutError) as err:
|
||||
self.sys_supervisor.connectivity = False
|
||||
_LOGGER.warning("Can't fetch versions from %s: %s", url, err)
|
||||
raise UpdaterError() from err
|
||||
raise UpdaterError(
|
||||
f"Can't fetch versions from {url}: {err}", _LOGGER.warning
|
||||
) from err
|
||||
|
||||
# Validate
|
||||
try:
|
||||
await self.sys_verify_content(checksum=calc_checksum(data))
|
||||
except CodeNotaryUntrusted:
|
||||
_LOGGER.critical("Content-Trust is broken for the version file fetch!")
|
||||
except CodeNotaryError as err:
|
||||
_LOGGER.error("CodeNotary error while processing version checks: %s", err)
|
||||
|
||||
# Parse data
|
||||
try:
|
||||
data = json.loads(data)
|
||||
except json.JSONDecodeError as err:
|
||||
_LOGGER.warning("Can't parse versions from %s: %s", url, err)
|
||||
raise UpdaterError() from err
|
||||
raise UpdaterError(
|
||||
f"Can't parse versions from {url}: {err}", _LOGGER.warning
|
||||
) from err
|
||||
|
||||
# data valid?
|
||||
if not data or data.get(ATTR_CHANNEL) != self.channel:
|
||||
_LOGGER.warning("Invalid data from %s", url)
|
||||
raise UpdaterError()
|
||||
raise UpdaterError(f"Invalid data from {url}", _LOGGER.warning)
|
||||
|
||||
try:
|
||||
# Update supervisor version
|
||||
@ -232,8 +251,9 @@ class Updater(FileConfiguration, CoreSysAttributes):
|
||||
self._data[ATTR_IMAGE][ATTR_MULTICAST] = data["image"]["multicast"]
|
||||
|
||||
except KeyError as err:
|
||||
_LOGGER.warning("Can't process version data: %s", err)
|
||||
raise UpdaterError() from err
|
||||
raise UpdaterError(
|
||||
f"Can't process version data: {err}", _LOGGER.warning
|
||||
) from err
|
||||
|
||||
else:
|
||||
self.save_data()
|
||||
|
95
supervisor/utils/codenotary.py
Normal file
95
supervisor/utils/codenotary.py
Normal file
@ -0,0 +1,95 @@
|
||||
"""Small wrapper for CodeNotary."""
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
import shlex
|
||||
from typing import Optional, Set, Tuple, Union
|
||||
|
||||
import async_timeout
|
||||
|
||||
from ..exceptions import CodeNotaryError, CodeNotaryUntrusted
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
_VCN_CMD: str = "vcn authenticate --silent --output json"
|
||||
_CACHE: Set[Tuple[str, Path, str, str]] = set()
|
||||
|
||||
|
||||
_ATTR_ERROR = "error"
|
||||
_ATTR_VERIFICATION = "verification"
|
||||
_ATTR_STATUS = "status"
|
||||
|
||||
|
||||
def calc_checksum(data: Union[str, bytes]) -> str:
|
||||
"""Generate checksum for CodeNotary."""
|
||||
if isinstance(data, str):
|
||||
return hashlib.sha256(data.encode()).hexdigest()
|
||||
return hashlib.sha256(data).hexdigest()
|
||||
|
||||
|
||||
async def vcn_validate(
|
||||
checksum: Optional[str] = None,
|
||||
path: Optional[Path] = None,
|
||||
org: Optional[str] = None,
|
||||
signer: Optional[str] = None,
|
||||
) -> None:
|
||||
"""Validate data against CodeNotary."""
|
||||
if (checksum, path, org, signer) in _CACHE:
|
||||
return
|
||||
command = shlex.split(_VCN_CMD)
|
||||
|
||||
# Generate command for request
|
||||
if org:
|
||||
command.extend(["--org", org])
|
||||
elif signer:
|
||||
command.extend(["--signer", signer])
|
||||
|
||||
if checksum:
|
||||
command.extend(["--hash", checksum])
|
||||
elif path:
|
||||
if path.is_dir:
|
||||
command.append(f"dir:/{path!s}")
|
||||
else:
|
||||
command.append(path.as_posix())
|
||||
else:
|
||||
RuntimeError("At least path or checksum need to be set!")
|
||||
|
||||
# Request notary authorization
|
||||
_LOGGER.debug("Send vcn command: %s", command)
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*command,
|
||||
stdin=asyncio.subprocess.DEVNULL,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.DEVNULL,
|
||||
)
|
||||
|
||||
async with async_timeout.timeout(10):
|
||||
data, _ = await proc.communicate()
|
||||
except OSError as err:
|
||||
raise CodeNotaryError(
|
||||
f"CodeNotary fatal error: {err!s}", _LOGGER.critical
|
||||
) from err
|
||||
except asyncio.TimeoutError:
|
||||
raise CodeNotaryError(
|
||||
"Timeout while processing CodeNotary", _LOGGER.error
|
||||
) from None
|
||||
|
||||
# Parse data
|
||||
try:
|
||||
data_json = json.loads(data)
|
||||
_LOGGER.debug("CodeNotary response with: %s", data_json)
|
||||
except (json.JSONDecodeError, UnicodeDecodeError) as err:
|
||||
raise CodeNotaryError(
|
||||
f"Can't parse CodeNotary output: {data!s} - {err!s}", _LOGGER.error
|
||||
) from err
|
||||
|
||||
if _ATTR_ERROR in data_json:
|
||||
raise CodeNotaryError(data_json[_ATTR_ERROR], _LOGGER.warning)
|
||||
|
||||
if data_json[_ATTR_VERIFICATION][_ATTR_STATUS] == 0:
|
||||
_CACHE.add((checksum, path, org, signer))
|
||||
else:
|
||||
raise CodeNotaryUntrusted()
|
@ -2,15 +2,16 @@
|
||||
import asyncio
|
||||
import io
|
||||
import logging
|
||||
from typing import Set
|
||||
|
||||
import aiohttp
|
||||
|
||||
from ..exceptions import PwnedConnectivityError, PwnedError
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
_API_CALL = "https://api.pwnedpasswords.com/range/{hash}"
|
||||
_API_CALL: str = "https://api.pwnedpasswords.com/range/{hash}"
|
||||
|
||||
_CACHE = set()
|
||||
_CACHE: Set[str] = set()
|
||||
|
||||
|
||||
async def check_pwned_password(websession: aiohttp.ClientSession, sha1_pw: str) -> bool:
|
||||
@ -27,7 +28,9 @@ async def check_pwned_password(websession: aiohttp.ClientSession, sha1_pw: str)
|
||||
_API_CALL.format(hash=sha1_short), timeout=aiohttp.ClientTimeout(total=10)
|
||||
) as request:
|
||||
if request.status != 200:
|
||||
raise PwnedError()
|
||||
raise PwnedError(
|
||||
f"Pwned service response with {request.status}", _LOGGER.warning
|
||||
)
|
||||
data = await request.text()
|
||||
|
||||
buffer = io.StringIO(data)
|
||||
@ -38,7 +41,8 @@ async def check_pwned_password(websession: aiohttp.ClientSession, sha1_pw: str)
|
||||
return True
|
||||
|
||||
except (aiohttp.ClientError, asyncio.TimeoutError) as err:
|
||||
_LOGGER.warning("Can't fetch hibp data: %s", err)
|
||||
raise PwnedConnectivityError() from err
|
||||
raise PwnedConnectivityError(
|
||||
f"Can't fetch HIBP data: {err}", _LOGGER.warning
|
||||
) from err
|
||||
|
||||
return False
|
||||
|
13
tests/utils/test_codenotary.py
Normal file
13
tests/utils/test_codenotary.py
Normal file
@ -0,0 +1,13 @@
|
||||
"""Test CodeNotary."""
|
||||
|
||||
|
||||
from supervisor.utils.codenotary import calc_checksum
|
||||
|
||||
|
||||
def test_checksum_calc():
|
||||
"""Calc Checkusm as test."""
|
||||
assert calc_checksum("test") == calc_checksum(b"test")
|
||||
assert (
|
||||
calc_checksum("test")
|
||||
== "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
|
||||
)
|
Loading…
x
Reference in New Issue
Block a user