Validate secrets on options/validate UI check (#2854)

* Validate secrets on options/validate UI check

* Allow schema as payload

* Update supervisor/api/addons.py

Co-authored-by: Franck Nijhof <git@frenck.dev>

* Offload into a module

* using new function

* disable check

* fix options value

* generated return value

* add debug logging

Co-authored-by: Franck Nijhof <git@frenck.dev>
This commit is contained in:
Pascal Vizeli 2021-05-10 14:27:50 +02:00 committed by GitHub
parent efc2e826a1
commit b59f741162
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 556 additions and 406 deletions

View File

@ -426,30 +426,20 @@ class Addon(AddonModel):
@property @property
def devices(self) -> Set[Device]: def devices(self) -> Set[Device]:
"""Extract devices from add-on options.""" """Extract devices from add-on options."""
raw_schema = self.data[ATTR_SCHEMA] options_schema = self.schema
if isinstance(raw_schema, bool) or not raw_schema:
return set()
# Validate devices
options_validator = AddonOptions(self.coresys, raw_schema, self.name, self.slug)
with suppress(vol.Invalid): with suppress(vol.Invalid):
options_validator(self.options) options_schema.validate(self.options)
return options_validator.devices return options_schema.devices
@property @property
def pwned(self) -> Set[str]: def pwned(self) -> Set[str]:
"""Extract pwned data for add-on options.""" """Extract pwned data for add-on options."""
raw_schema = self.data[ATTR_SCHEMA] options_schema = self.schema
if isinstance(raw_schema, bool) or not raw_schema:
return set()
# Validate devices
options_validator = AddonOptions(self.coresys, raw_schema, self.name, self.slug)
with suppress(vol.Invalid): with suppress(vol.Invalid):
options_validator(self.options) options_schema.validate(self.options)
return options_validator.pwned return options_schema.pwned
def save_persist(self) -> None: def save_persist(self) -> None:
"""Save data of add-on.""" """Save data of add-on."""
@ -503,7 +493,7 @@ class Addon(AddonModel):
await self.sys_homeassistant.secrets.reload() await self.sys_homeassistant.secrets.reload()
try: try:
options = self.schema(self.options) options = self.schema.validate(self.options)
write_json_file(self.path_options, options) write_json_file(self.path_options, options)
except vol.Invalid as ex: except vol.Invalid as ex:
_LOGGER.error( _LOGGER.error(

View File

@ -4,7 +4,6 @@ from pathlib import Path
from typing import Any, Awaitable, Dict, List, Optional from typing import Any, Awaitable, Dict, List, Optional
from awesomeversion import AwesomeVersion, AwesomeVersionException from awesomeversion import AwesomeVersion, AwesomeVersionException
import voluptuous as vol
from ..const import ( from ..const import (
ATTR_ADVANCED, ATTR_ADVANCED,
@ -532,18 +531,16 @@ class AddonModel(CoreSysAttributes, ABC):
return Path(self.path_location, "apparmor.txt") return Path(self.path_location, "apparmor.txt")
@property @property
def schema(self) -> vol.Schema: def schema(self) -> AddonOptions:
"""Create a schema for add-on options.""" """Return Addon options validation object."""
raw_schema = self.data[ATTR_SCHEMA] raw_schema = self.data[ATTR_SCHEMA]
if isinstance(raw_schema, bool): if isinstance(raw_schema, bool):
raw_schema = {} raw_schema = {}
return vol.Schema(
vol.All(dict, AddonOptions(self.coresys, raw_schema, self.name, self.slug)) return AddonOptions(self.coresys, raw_schema, self.name, self.slug)
)
@property @property
def schema_ui(self) -> Optional[List[Dict[str, Any]]]: def schema_ui(self) -> Optional[List[Dict[any, any]]]:
"""Create a UI schema for add-on options.""" """Create a UI schema for add-on options."""
raw_schema = self.data[ATTR_SCHEMA] raw_schema = self.data[ATTR_SCHEMA]

View File

@ -69,6 +69,11 @@ class AddonOptions(CoreSysAttributes):
self._name = name self._name = name
self._slug = slug self._slug = slug
@property
def validate(self) -> vol.Schema:
"""Create a schema for add-on options."""
return vol.Schema(vol.All(dict, self))
def __call__(self, struct): def __call__(self, struct):
"""Create schema validator for add-ons options.""" """Create schema validator for add-ons options."""
options = {} options = {}

View File

@ -19,13 +19,14 @@ from .host import APIHost
from .info import APIInfo from .info import APIInfo
from .ingress import APIIngress from .ingress import APIIngress
from .jobs import APIJobs from .jobs import APIJobs
from .middleware_security import SecurityMiddleware
from .multicast import APIMulticast from .multicast import APIMulticast
from .network import APINetwork from .network import APINetwork
from .observer import APIObserver from .observer import APIObserver
from .os import APIOS from .os import APIOS
from .proxy import APIProxy from .proxy import APIProxy
from .resolution import APIResoulution from .resolution import APIResoulution
from .security import SecurityMiddleware from .security import APISecurity
from .services import APIServices from .services import APIServices
from .snapshots import APISnapshots from .snapshots import APISnapshots
from .store import APIStore from .store import APIStore
@ -82,6 +83,7 @@ class RestAPI(CoreSysAttributes):
self._register_snapshots() self._register_snapshots()
self._register_supervisor() self._register_supervisor()
self._register_store() self._register_store()
self._register_security()
await self.start() await self.start()
@ -146,6 +148,18 @@ class RestAPI(CoreSysAttributes):
] ]
) )
def _register_security(self) -> None:
"""Register Security functions."""
api_security = APISecurity()
api_security.coresys = self.coresys
self.webapp.add_routes(
[
web.get("/security/info", api_security.info),
web.post("/security/options", api_security.options),
]
)
def _register_jobs(self) -> None: def _register_jobs(self) -> None:
"""Register Jobs functions.""" """Register Jobs functions."""
api_jobs = APIJobs() api_jobs = APIJobs()

View File

@ -71,6 +71,7 @@ from ..const import (
ATTR_OPTIONS, ATTR_OPTIONS,
ATTR_PRIVILEGED, ATTR_PRIVILEGED,
ATTR_PROTECTED, ATTR_PROTECTED,
ATTR_PWNED,
ATTR_RATING, ATTR_RATING,
ATTR_REPOSITORIES, ATTR_REPOSITORIES,
ATTR_REPOSITORY, ATTR_REPOSITORY,
@ -104,9 +105,8 @@ from ..const import (
from ..coresys import CoreSysAttributes from ..coresys import CoreSysAttributes
from ..docker.stats import DockerStats from ..docker.stats import DockerStats
from ..exceptions import APIError, APIForbidden, PwnedError, PwnedSecret from ..exceptions import APIError, APIForbidden, PwnedError, PwnedSecret
from ..utils.pwned import check_pwned_password
from ..validate import docker_ports from ..validate import docker_ports
from .utils import api_process, api_process_raw, api_validate from .utils import api_process, api_process_raw, api_validate, json_loads
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
@ -338,30 +338,38 @@ class APIAddons(CoreSysAttributes):
async def options_validate(self, request: web.Request) -> None: async def options_validate(self, request: web.Request) -> None:
"""Validate user options for add-on.""" """Validate user options for add-on."""
addon = self._extract_addon_installed(request) addon = self._extract_addon_installed(request)
data = {ATTR_MESSAGE: "", ATTR_VALID: True} data = {ATTR_MESSAGE: "", ATTR_VALID: True, ATTR_PWNED: False}
options = await request.json(loads=json_loads) or addon.options
# Validate config # Validate config
options_schema = addon.schema
try: try:
addon.schema(addon.options) options_schema.validate(options)
except vol.Invalid as ex: except vol.Invalid as ex:
data[ATTR_MESSAGE] = humanize_error(addon.options, ex) data[ATTR_MESSAGE] = humanize_error(options, ex)
data[ATTR_VALID] = False data[ATTR_VALID] = False
# Validate security if not self.sys_security.pwned:
if self.sys_config.force_security: return data
for secret in addon.pwned:
try:
await check_pwned_password(self.sys_websession, secret)
continue
except PwnedSecret:
data[ATTR_MESSAGE] = "Add-on use pwned secrets!"
except PwnedError as err:
data[
ATTR_MESSAGE
] = f"Error happening on pwned secrets check: {err!s}!"
data[ATTR_VALID] = False # Pwned check
break for secret in options_schema.pwned:
try:
await self.sys_security.verify_secret(secret)
continue
except PwnedSecret:
data[ATTR_PWNED] = True
except PwnedError:
data[ATTR_PWNED] = None
break
if self.sys_security.force and data[ATTR_PWNED] in (None, True):
data[ATTR_VALID] = False
if data[ATTR_PWNED] is None:
data[ATTR_MESSAGE] = "Error happening on pwned secrets check!"
else:
data[ATTR_MESSAGE] = "Add-on uses pwned secrets!"
return data return data
@ -374,7 +382,7 @@ class APIAddons(CoreSysAttributes):
addon = self._extract_addon_installed(request) addon = self._extract_addon_installed(request)
try: try:
return addon.schema(addon.options) return addon.schema.validate(addon.options)
except vol.Invalid: except vol.Invalid:
raise APIError("Invalid configuration data for the add-on") from None raise APIError("Invalid configuration data for the add-on") from None

View File

@ -0,0 +1,200 @@
"""Handle security part of this API."""
import logging
import re
from aiohttp.web import Request, RequestHandler, Response, middleware
from aiohttp.web_exceptions import HTTPForbidden, HTTPUnauthorized
from ..const import (
REQUEST_FROM,
ROLE_ADMIN,
ROLE_BACKUP,
ROLE_DEFAULT,
ROLE_HOMEASSISTANT,
ROLE_MANAGER,
CoreState,
)
from ..coresys import CoreSys, CoreSysAttributes
from .utils import api_return_error, excract_supervisor_token
_LOGGER: logging.Logger = logging.getLogger(__name__)
# fmt: off
# Block Anytime
BLACKLIST = re.compile(
r"^(?:"
r"|/homeassistant/api/hassio/.*"
r"|/core/api/hassio/.*"
r")$"
)
# Free to call or have own security concepts
NO_SECURITY_CHECK = re.compile(
r"^(?:"
r"|/homeassistant/api/.*"
r"|/homeassistant/websocket"
r"|/core/api/.*"
r"|/core/websocket"
r"|/supervisor/ping"
r")$"
)
# Observer allow API calls
OBSERVER_CHECK = re.compile(
r"^(?:"
r"|/.+/info"
r")$"
)
# Can called by every add-on
ADDONS_API_BYPASS = re.compile(
r"^(?:"
r"|/addons/self/(?!security|update)[^/]+"
r"|/addons/self/options/config"
r"|/info"
r"|/hardware/trigger"
r"|/services.*"
r"|/discovery.*"
r"|/auth"
r")$"
)
# Policy role add-on API access
ADDONS_ROLE_ACCESS = {
ROLE_DEFAULT: re.compile(
r"^(?:"
r"|/.+/info"
r"|/addons"
r")$"
),
ROLE_HOMEASSISTANT: re.compile(
r"^(?:"
r"|/core/.+"
r"|/homeassistant/.+"
r")$"
),
ROLE_BACKUP: re.compile(
r"^(?:"
r"|/snapshots.*"
r")$"
),
ROLE_MANAGER: re.compile(
r"^(?:"
r"|/addons(?:/[^/]+/(?!security).+|/reload)?"
r"|/audio/.+"
r"|/auth/cache"
r"|/cli/.+"
r"|/core/.+"
r"|/dns/.+"
r"|/docker/.+"
r"|/jobs/.+"
r"|/hardware/.+"
r"|/hassos/.+"
r"|/homeassistant/.+"
r"|/host/.+"
r"|/multicast/.+"
r"|/network/.+"
r"|/observer/.+"
r"|/os/.+"
r"|/resolution/.+"
r"|/snapshots.*"
r"|/store.*"
r"|/supervisor/.+"
r")$"
),
ROLE_ADMIN: re.compile(
r".*"
),
}
# fmt: on
class SecurityMiddleware(CoreSysAttributes):
"""Security middleware functions."""
def __init__(self, coresys: CoreSys):
"""Initialize security middleware."""
self.coresys: CoreSys = coresys
@middleware
async def system_validation(
self, request: Request, handler: RequestHandler
) -> Response:
"""Check if core is ready to response."""
if self.sys_core.state not in (
CoreState.STARTUP,
CoreState.RUNNING,
CoreState.FREEZE,
):
return api_return_error(
message=f"System is not ready with state: {self.sys_core.state.value}"
)
return await handler(request)
@middleware
async def token_validation(
self, request: Request, handler: RequestHandler
) -> Response:
"""Check security access of this layer."""
request_from = None
supervisor_token = excract_supervisor_token(request)
# Blacklist
if BLACKLIST.match(request.path):
_LOGGER.error("%s is blacklisted!", request.path)
raise HTTPForbidden()
# Ignore security check
if NO_SECURITY_CHECK.match(request.path):
_LOGGER.debug("Passthrough %s", request.path)
return await handler(request)
# Not token
if not supervisor_token:
_LOGGER.warning("No API token provided for %s", request.path)
raise HTTPUnauthorized()
# Home-Assistant
if supervisor_token == self.sys_homeassistant.supervisor_token:
_LOGGER.debug("%s access from Home Assistant", request.path)
request_from = self.sys_homeassistant
# Host
if supervisor_token == self.sys_plugins.cli.supervisor_token:
_LOGGER.debug("%s access from Host", request.path)
request_from = self.sys_host
# Observer
if supervisor_token == self.sys_plugins.observer.supervisor_token:
if not OBSERVER_CHECK.match(request.url):
_LOGGER.warning("%s invalid Observer access", request.path)
raise HTTPForbidden()
_LOGGER.debug("%s access from Observer", request.path)
request_from = self.sys_plugins.observer
# Add-on
addon = None
if supervisor_token and not request_from:
addon = self.sys_addons.from_token(supervisor_token)
# Check Add-on API access
if addon and ADDONS_API_BYPASS.match(request.path):
_LOGGER.debug("Passthrough %s from %s", request.path, addon.slug)
request_from = addon
elif addon and addon.access_hassio_api:
# Check Role
if ADDONS_ROLE_ACCESS[addon.hassio_role].match(request.path):
_LOGGER.info("%s access from %s", request.path, addon.slug)
request_from = addon
else:
_LOGGER.warning("%s no role for %s", request.path, addon.slug)
if request_from:
request[REQUEST_FROM] = request_from
return await handler(request)
_LOGGER.error("Invalid token for access %s", request.path)
raise HTTPForbidden()

View File

@ -1,200 +1,50 @@
"""Handle security part of this API.""" """Init file for Supervisor Security RESTful API."""
import logging import logging
import re from typing import Any, Dict
from aiohttp.web import Request, RequestHandler, Response, middleware from aiohttp import web
from aiohttp.web_exceptions import HTTPForbidden, HTTPUnauthorized import voluptuous as vol
from ..const import ( from ..const import ATTR_CONTENT_TRUST, ATTR_FORCE_SECURITY, ATTR_PWNED
REQUEST_FROM, from ..coresys import CoreSysAttributes
ROLE_ADMIN, from .utils import api_process, api_validate
ROLE_BACKUP,
ROLE_DEFAULT,
ROLE_HOMEASSISTANT,
ROLE_MANAGER,
CoreState,
)
from ..coresys import CoreSys, CoreSysAttributes
from .utils import api_return_error, excract_supervisor_token
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
# fmt: off # pylint: disable=no-value-for-parameter
SCHEMA_OPTIONS = vol.Schema(
# Block Anytime {
BLACKLIST = re.compile( vol.Optional(ATTR_PWNED): vol.Boolean(),
r"^(?:" vol.Optional(ATTR_CONTENT_TRUST): vol.Boolean(),
r"|/homeassistant/api/hassio/.*" vol.Optional(ATTR_FORCE_SECURITY): vol.Boolean(),
r"|/core/api/hassio/.*" }
r")$"
) )
# Free to call or have own security concepts
NO_SECURITY_CHECK = re.compile(
r"^(?:"
r"|/homeassistant/api/.*"
r"|/homeassistant/websocket"
r"|/core/api/.*"
r"|/core/websocket"
r"|/supervisor/ping"
r")$"
)
# Observer allow API calls class APISecurity(CoreSysAttributes):
OBSERVER_CHECK = re.compile( """Handle RESTful API for Security functions."""
r"^(?:"
r"|/.+/info"
r")$"
)
# Can called by every add-on @api_process
ADDONS_API_BYPASS = re.compile( async def info(self, request: web.Request) -> Dict[str, Any]:
r"^(?:" """Return Security information."""
r"|/addons/self/(?!security|update)[^/]+" return {
r"|/addons/self/options/config" ATTR_CONTENT_TRUST: self.sys_security.content_trust,
r"|/info" ATTR_PWNED: self.sys_security.pwned,
r"|/hardware/trigger" ATTR_FORCE_SECURITY: self.sys_security.force,
r"|/services.*" }
r"|/discovery.*"
r"|/auth"
r")$"
)
# Policy role add-on API access @api_process
ADDONS_ROLE_ACCESS = { async def options(self, request: web.Request) -> None:
ROLE_DEFAULT: re.compile( """Set options for Security."""
r"^(?:" body = await api_validate(SCHEMA_OPTIONS, request)
r"|/.+/info"
r"|/addons"
r")$"
),
ROLE_HOMEASSISTANT: re.compile(
r"^(?:"
r"|/core/.+"
r"|/homeassistant/.+"
r")$"
),
ROLE_BACKUP: re.compile(
r"^(?:"
r"|/snapshots.*"
r")$"
),
ROLE_MANAGER: re.compile(
r"^(?:"
r"|/addons(?:/[^/]+/(?!security).+|/reload)?"
r"|/audio/.+"
r"|/auth/cache"
r"|/cli/.+"
r"|/core/.+"
r"|/dns/.+"
r"|/docker/.+"
r"|/jobs/.+"
r"|/hardware/.+"
r"|/hassos/.+"
r"|/homeassistant/.+"
r"|/host/.+"
r"|/multicast/.+"
r"|/network/.+"
r"|/observer/.+"
r"|/os/.+"
r"|/resolution/.+"
r"|/snapshots.*"
r"|/store.*"
r"|/supervisor/.+"
r")$"
),
ROLE_ADMIN: re.compile(
r".*"
),
}
# fmt: on if ATTR_PWNED in body:
self.sys_security.pwned = body[ATTR_PWNED]
if ATTR_CONTENT_TRUST in body:
self.sys_security.content_trust = body[ATTR_CONTENT_TRUST]
if ATTR_FORCE_SECURITY in body:
self.sys_security.force = body[ATTR_FORCE_SECURITY]
self.sys_security.save_data()
class SecurityMiddleware(CoreSysAttributes): await self.sys_resolution.evaluate.evaluate_system()
"""Security middleware functions."""
def __init__(self, coresys: CoreSys):
"""Initialize security middleware."""
self.coresys: CoreSys = coresys
@middleware
async def system_validation(
self, request: Request, handler: RequestHandler
) -> Response:
"""Check if core is ready to response."""
if self.sys_core.state not in (
CoreState.STARTUP,
CoreState.RUNNING,
CoreState.FREEZE,
):
return api_return_error(
message=f"System is not ready with state: {self.sys_core.state.value}"
)
return await handler(request)
@middleware
async def token_validation(
self, request: Request, handler: RequestHandler
) -> Response:
"""Check security access of this layer."""
request_from = None
supervisor_token = excract_supervisor_token(request)
# Blacklist
if BLACKLIST.match(request.path):
_LOGGER.error("%s is blacklisted!", request.path)
raise HTTPForbidden()
# Ignore security check
if NO_SECURITY_CHECK.match(request.path):
_LOGGER.debug("Passthrough %s", request.path)
return await handler(request)
# Not token
if not supervisor_token:
_LOGGER.warning("No API token provided for %s", request.path)
raise HTTPUnauthorized()
# Home-Assistant
if supervisor_token == self.sys_homeassistant.supervisor_token:
_LOGGER.debug("%s access from Home Assistant", request.path)
request_from = self.sys_homeassistant
# Host
if supervisor_token == self.sys_plugins.cli.supervisor_token:
_LOGGER.debug("%s access from Host", request.path)
request_from = self.sys_host
# Observer
if supervisor_token == self.sys_plugins.observer.supervisor_token:
if not OBSERVER_CHECK.match(request.url):
_LOGGER.warning("%s invalid Observer access", request.path)
raise HTTPForbidden()
_LOGGER.debug("%s access from Observer", request.path)
request_from = self.sys_plugins.observer
# Add-on
addon = None
if supervisor_token and not request_from:
addon = self.sys_addons.from_token(supervisor_token)
# Check Add-on API access
if addon and ADDONS_API_BYPASS.match(request.path):
_LOGGER.debug("Passthrough %s from %s", request.path, addon.slug)
request_from = addon
elif addon and addon.access_hassio_api:
# Check Role
if ADDONS_ROLE_ACCESS[addon.hassio_role].match(request.path):
_LOGGER.info("%s access from %s", request.path, addon.slug)
request_from = addon
else:
_LOGGER.warning("%s no role for %s", request.path, addon.slug)
if request_from:
request[REQUEST_FROM] = request_from
return await handler(request)
_LOGGER.error("Invalid token for access %s", request.path)
raise HTTPForbidden()

View File

@ -116,8 +116,6 @@ class APISupervisor(CoreSysAttributes):
ATTR_DEBUG: self.sys_config.debug, ATTR_DEBUG: self.sys_config.debug,
ATTR_DEBUG_BLOCK: self.sys_config.debug_block, ATTR_DEBUG_BLOCK: self.sys_config.debug_block,
ATTR_DIAGNOSTICS: self.sys_config.diagnostics, ATTR_DIAGNOSTICS: self.sys_config.diagnostics,
ATTR_CONTENT_TRUST: self.sys_config.content_trust,
ATTR_FORCE_SECURITY: self.sys_config.force_security,
ATTR_ADDONS: list_addons, ATTR_ADDONS: list_addons,
ATTR_ADDONS_REPOSITORIES: self.sys_config.addons_repositories, ATTR_ADDONS_REPOSITORIES: self.sys_config.addons_repositories,
} }
@ -148,11 +146,13 @@ class APISupervisor(CoreSysAttributes):
if ATTR_LOGGING in body: if ATTR_LOGGING in body:
self.sys_config.logging = body[ATTR_LOGGING] self.sys_config.logging = body[ATTR_LOGGING]
# REMOVE: 2021.7
if ATTR_CONTENT_TRUST in body: if ATTR_CONTENT_TRUST in body:
self.sys_config.content_trust = body[ATTR_CONTENT_TRUST] self.sys_security.content_trust = body[ATTR_CONTENT_TRUST]
# REMOVE: 2021.7
if ATTR_FORCE_SECURITY in body: if ATTR_FORCE_SECURITY in body:
self.sys_config.force_security = body[ATTR_FORCE_SECURITY] self.sys_security.force = body[ATTR_FORCE_SECURITY]
if ATTR_ADDONS_REPOSITORIES in body: if ATTR_ADDONS_REPOSITORIES in body:
new = set(body[ATTR_ADDONS_REPOSITORIES]) new = set(body[ATTR_ADDONS_REPOSITORIES])

View File

@ -45,6 +45,7 @@ from .misc.scheduler import Scheduler
from .misc.tasks import Tasks from .misc.tasks import Tasks
from .plugins import PluginManager from .plugins import PluginManager
from .resolution.module import ResolutionManager from .resolution.module import ResolutionManager
from .security import Security
from .services import ServiceManager from .services import ServiceManager
from .snapshots import SnapshotManager from .snapshots import SnapshotManager
from .store import StoreManager from .store import StoreManager
@ -82,6 +83,7 @@ async def initialize_coresys() -> CoreSys:
coresys.dbus = DBusManager(coresys) coresys.dbus = DBusManager(coresys)
coresys.hassos = HassOS(coresys) coresys.hassos = HassOS(coresys)
coresys.scheduler = Scheduler(coresys) coresys.scheduler = Scheduler(coresys)
coresys.security = Security(coresys)
# diagnostics # diagnostics
setup_diagnostics(coresys) setup_diagnostics(coresys)
@ -188,10 +190,10 @@ def initialize_system_data(coresys: CoreSys) -> None:
# Check if ENV is in development mode # Check if ENV is in development mode
if coresys.dev: if coresys.dev:
_LOGGER.warning("Environment variables 'SUPERVISOR_DEV' is set") _LOGGER.warning("Environment variables 'SUPERVISOR_DEV' is set")
coresys.updater.channel = UpdateChannel.DEV
coresys.config.logging = LogLevel.DEBUG coresys.config.logging = LogLevel.DEBUG
coresys.config.content_trust = False
coresys.config.debug = True coresys.config.debug = True
coresys.updater.channel = UpdateChannel.DEV
coresys.security.content_trust = False
def migrate_system_env(coresys: CoreSys) -> None: def migrate_system_env(coresys: CoreSys) -> None:

View File

@ -9,11 +9,9 @@ from awesomeversion import AwesomeVersion
from .const import ( from .const import (
ATTR_ADDONS_CUSTOM_LIST, ATTR_ADDONS_CUSTOM_LIST,
ATTR_CONTENT_TRUST,
ATTR_DEBUG, ATTR_DEBUG,
ATTR_DEBUG_BLOCK, ATTR_DEBUG_BLOCK,
ATTR_DIAGNOSTICS, ATTR_DIAGNOSTICS,
ATTR_FORCE_SECURITY,
ATTR_IMAGE, ATTR_IMAGE,
ATTR_LAST_BOOT, ATTR_LAST_BOOT,
ATTR_LOGGING, ATTR_LOGGING,
@ -159,26 +157,6 @@ class CoreConfig(FileConfiguration):
"""Set last boot datetime.""" """Set last boot datetime."""
self._data[ATTR_LAST_BOOT] = value.isoformat() self._data[ATTR_LAST_BOOT] = value.isoformat()
@property
def content_trust(self) -> bool:
"""Return if content trust is enabled/disabled."""
return self._data[ATTR_CONTENT_TRUST]
@content_trust.setter
def content_trust(self, value: bool) -> None:
"""Set content trust is enabled/disabled."""
self._data[ATTR_CONTENT_TRUST] = value
@property
def force_security(self) -> bool:
"""Return if force security is enabled/disabled."""
return self._data[ATTR_FORCE_SECURITY]
@force_security.setter
def force_security(self, value: bool) -> None:
"""Set force security is enabled/disabled."""
self._data[ATTR_FORCE_SECURITY] = value
@property @property
def path_supervisor(self) -> Path: def path_supervisor(self) -> Path:
"""Return Supervisor data path.""" """Return Supervisor data path."""

View File

@ -20,6 +20,7 @@ FILE_HASSIO_HOMEASSISTANT = Path(SUPERVISOR_DATA, "homeassistant.json")
FILE_HASSIO_INGRESS = Path(SUPERVISOR_DATA, "ingress.json") FILE_HASSIO_INGRESS = Path(SUPERVISOR_DATA, "ingress.json")
FILE_HASSIO_SERVICES = Path(SUPERVISOR_DATA, "services.json") FILE_HASSIO_SERVICES = Path(SUPERVISOR_DATA, "services.json")
FILE_HASSIO_UPDATER = Path(SUPERVISOR_DATA, "updater.json") FILE_HASSIO_UPDATER = Path(SUPERVISOR_DATA, "updater.json")
FILE_HASSIO_SECURITY = Path(SUPERVISOR_DATA, "security.json")
FILE_SUFFIX_CONFIGURATION = [".yaml", ".yml", ".json"] FILE_SUFFIX_CONFIGURATION = [".yaml", ".yml", ".json"]
@ -316,6 +317,7 @@ ATTR_WEBUI = "webui"
ATTR_WIFI = "wifi" ATTR_WIFI = "wifi"
ATTR_CONTENT_TRUST = "content_trust" ATTR_CONTENT_TRUST = "content_trust"
ATTR_FORCE_SECURITY = "force_security" ATTR_FORCE_SECURITY = "force_security"
ATTR_PWNED = "pwned"
PROVIDE_SERVICE = "provide" PROVIDE_SERVICE = "provide"
NEED_SERVICE = "need" NEED_SERVICE = "need"

View File

@ -4,8 +4,7 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import os import os
from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, Coroutine, Optional, TypeVar
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Coroutine, Optional, TypeVar
import aiohttp import aiohttp
import sentry_sdk import sentry_sdk
@ -13,9 +12,6 @@ import sentry_sdk
from .config import CoreConfig from .config import CoreConfig
from .const import ENV_SUPERVISOR_DEV from .const import ENV_SUPERVISOR_DEV
from .docker import DockerAPI from .docker import DockerAPI
from .exceptions import CodeNotaryError, CodeNotaryUntrusted
from .resolution.const import UnhealthyReason
from .utils.codenotary import vcn_validate
if TYPE_CHECKING: if TYPE_CHECKING:
from .addons import AddonManager from .addons import AddonManager
@ -40,6 +36,7 @@ if TYPE_CHECKING:
from .store import StoreManager from .store import StoreManager
from .supervisor import Supervisor from .supervisor import Supervisor
from .updater import Updater from .updater import Updater
from .security import Security
T = TypeVar("T") T = TypeVar("T")
@ -90,6 +87,7 @@ class CoreSys:
self._plugins: Optional[PluginManager] = None self._plugins: Optional[PluginManager] = None
self._resolution: Optional[ResolutionManager] = None self._resolution: Optional[ResolutionManager] = None
self._jobs: Optional[JobManager] = None self._jobs: Optional[JobManager] = None
self._security: Optional[Security] = None
@property @property
def dev(self) -> bool: def dev(self) -> bool:
@ -415,6 +413,20 @@ class CoreSys:
raise RuntimeError("resolution manager already set!") raise RuntimeError("resolution manager already set!")
self._resolution = value self._resolution = value
@property
def security(self) -> Security:
"""Return security object."""
if self._security is None:
raise RuntimeError("security not set!")
return self._security
@security.setter
def security(self, value: Security) -> None:
"""Set a security object."""
if self._security:
raise RuntimeError("security already set!")
self._security = value
@property @property
def jobs(self) -> JobManager: def jobs(self) -> JobManager:
"""Return resolution manager object.""" """Return resolution manager object."""
@ -599,6 +611,11 @@ class CoreSysAttributes:
"""Return Resolution manager object.""" """Return Resolution manager object."""
return self.coresys.resolution return self.coresys.resolution
@property
def sys_security(self) -> Security:
"""Return Security object."""
return self.coresys.security
@property @property
def sys_jobs(self) -> JobManager: def sys_jobs(self) -> JobManager:
"""Return Job manager object.""" """Return Job manager object."""
@ -617,21 +634,3 @@ class CoreSysAttributes:
def sys_capture_exception(self, err: Exception) -> None: def sys_capture_exception(self, err: Exception) -> None:
"""Capture a exception.""" """Capture a exception."""
sentry_sdk.capture_exception(err) sentry_sdk.capture_exception(err)
async def sys_verify_content(
self, checksum: Optional[str] = None, path: Optional[Path] = None
) -> Awaitable[None]:
"""Verify content from HA org."""
if not self.sys_config.content_trust:
_LOGGER.warning("Disabled content-trust, skip validation")
return
try:
await vcn_validate(checksum, path, org="home-assistant.io")
except CodeNotaryUntrusted:
self.sys_resolution.unhealthy = UnhealthyReason.UNTRUSTED
raise
except CodeNotaryError:
if self.sys_config.force_security:
raise
return

View File

@ -621,6 +621,6 @@ class DockerInterface(CoreSysAttributes):
"""Validate trust of content.""" """Validate trust of content."""
checksum = image_id.partition(":")[2] checksum = image_id.partition(":")[2]
job = asyncio.run_coroutine_threadsafe( job = asyncio.run_coroutine_threadsafe(
self.sys_verify_content(checksum=checksum), self.sys_loop self.sys_security.verify_own_content(checksum=checksum), self.sys_loop
) )
job.result(timeout=20) job.result(timeout=20)

View File

@ -7,7 +7,6 @@ from ...coresys import CoreSys
from ...exceptions import PwnedConnectivityError, PwnedError, PwnedSecret from ...exceptions import PwnedConnectivityError, PwnedError, PwnedSecret
from ...jobs.const import JobCondition, JobExecutionLimit from ...jobs.const import JobCondition, JobExecutionLimit
from ...jobs.decorator import Job from ...jobs.decorator import Job
from ...utils.pwned import check_pwned_password
from ..const import ContextType, IssueType, SuggestionType from ..const import ContextType, IssueType, SuggestionType
from .base import CheckBase from .base import CheckBase
@ -20,6 +19,13 @@ def setup(coresys: CoreSys) -> CheckBase:
class CheckAddonPwned(CheckBase): class CheckAddonPwned(CheckBase):
"""CheckAddonPwned class for check.""" """CheckAddonPwned class for check."""
@property
def enabled(self) -> bool:
"""Return True if the check is enabled."""
if not self.sys_security.pwned:
return False
return super().enabled
@Job( @Job(
conditions=[JobCondition.INTERNET_SYSTEM], conditions=[JobCondition.INTERNET_SYSTEM],
limit=JobExecutionLimit.THROTTLE, limit=JobExecutionLimit.THROTTLE,
@ -37,7 +43,7 @@ class CheckAddonPwned(CheckBase):
# check passwords # check passwords
for secret in secrets: for secret in secrets:
try: try:
await check_pwned_password(self.sys_websession, secret) await self.sys_security.verify_secret(secret)
except PwnedConnectivityError: except PwnedConnectivityError:
self.sys_supervisor.connectivity = False self.sys_supervisor.connectivity = False
return return
@ -75,7 +81,7 @@ class CheckAddonPwned(CheckBase):
# Check if still pwned # Check if still pwned
for secret in secrets: for secret in secrets:
try: try:
await check_pwned_password(self.sys_websession, secret) await self.sys_security.verify_secret(secret)
except PwnedSecret: except PwnedSecret:
return True return True
except PwnedError: except PwnedError:

View File

@ -32,4 +32,4 @@ class EvaluateContentTrust(EvaluateBase):
async def evaluate(self) -> None: async def evaluate(self) -> None:
"""Run evaluation.""" """Run evaluation."""
return not self.sys_config.content_trust return not self.sys_security.content_trust

View File

@ -38,12 +38,12 @@ class EvaluateSourceMods(EvaluateBase):
async def evaluate(self) -> None: async def evaluate(self) -> None:
"""Run evaluation.""" """Run evaluation."""
if not self.sys_config.content_trust: if not self.sys_security.content_trust:
_LOGGER.warning("Disabled content-trust, skipping evaluation") _LOGGER.warning("Disabled content-trust, skipping evaluation")
return return
try: try:
await self.sys_verify_content(path=_SUPERVISOR_SOURCE) await self.sys_security.verify_own_content(path=_SUPERVISOR_SOURCE)
except CodeNotaryUntrusted: except CodeNotaryUntrusted:
return True return True
except CodeNotaryError: except CodeNotaryError:

90
supervisor/security.py Normal file
View File

@ -0,0 +1,90 @@
"""Fetch last versions from webserver."""
import logging
from pathlib import Path
from typing import Awaitable, Optional
from .const import (
ATTR_CONTENT_TRUST,
ATTR_FORCE_SECURITY,
ATTR_PWNED,
FILE_HASSIO_SECURITY,
)
from .coresys import CoreSys, CoreSysAttributes
from .exceptions import CodeNotaryError, CodeNotaryUntrusted, PwnedError
from .resolution.const import UnhealthyReason
from .utils.codenotary import vcn_validate
from .utils.common import FileConfiguration
from .utils.pwned import check_pwned_password
from .validate import SCHEMA_SECURITY_CONFIG
_LOGGER: logging.Logger = logging.getLogger(__name__)
class Security(FileConfiguration, CoreSysAttributes):
"""Handle Security properties."""
def __init__(self, coresys: CoreSys):
"""Initialize updater."""
super().__init__(FILE_HASSIO_SECURITY, SCHEMA_SECURITY_CONFIG)
self.coresys = coresys
@property
def content_trust(self) -> bool:
"""Return if content trust is enabled/disabled."""
return self._data[ATTR_CONTENT_TRUST]
@content_trust.setter
def content_trust(self, value: bool) -> None:
"""Set content trust is enabled/disabled."""
self._data[ATTR_CONTENT_TRUST] = value
@property
def force(self) -> bool:
"""Return if force security is enabled/disabled."""
return self._data[ATTR_FORCE_SECURITY]
@force.setter
def force(self, value: bool) -> None:
"""Set force security is enabled/disabled."""
self._data[ATTR_FORCE_SECURITY] = value
@property
def pwned(self) -> bool:
"""Return if pwned is enabled/disabled."""
return self._data[ATTR_PWNED]
@pwned.setter
def pwned(self, value: bool) -> None:
"""Set pwned is enabled/disabled."""
self._data[ATTR_PWNED] = value
async def verify_own_content(
self, checksum: Optional[str] = None, path: Optional[Path] = None
) -> Awaitable[None]:
"""Verify content from HA org."""
if not self.content_trust:
_LOGGER.warning("Disabled content-trust, skip validation")
return
try:
await vcn_validate(checksum, path, org="home-assistant.io")
except CodeNotaryUntrusted:
self.sys_resolution.unhealthy = UnhealthyReason.UNTRUSTED
raise
except CodeNotaryError:
if self.force:
raise
return
async def verify_secret(self, pwned_hash: str) -> None:
"""Verify pwned state of a secret."""
if not self.pwned:
_LOGGER.warning("Disabled pwned, skip validation")
return
try:
await check_pwned_password(self.sys_websession, pwned_hash)
except PwnedError:
if self.force:
raise
return

View File

@ -127,7 +127,7 @@ class Supervisor(CoreSysAttributes):
# Validate # Validate
try: try:
await self.sys_verify_content(checksum=calc_checksum(data)) await self.sys_security.verify_own_content(checksum=calc_checksum(data))
except CodeNotaryUntrusted as err: except CodeNotaryUntrusted as err:
raise SupervisorAppArmorError( raise SupervisorAppArmorError(
"Content-Trust is broken for the AppArmor profile fetch!", "Content-Trust is broken for the AppArmor profile fetch!",

View File

@ -207,7 +207,7 @@ class Updater(FileConfiguration, CoreSysAttributes):
# Validate # Validate
try: try:
await self.sys_verify_content(checksum=calc_checksum(data)) await self.sys_security.verify_own_content(checksum=calc_checksum(data))
except CodeNotaryUntrusted as err: except CodeNotaryUntrusted as err:
raise UpdaterError( raise UpdaterError(
"Content-Trust is broken for the version file fetch!", _LOGGER.critical "Content-Trust is broken for the version file fetch!", _LOGGER.critical

View File

@ -23,6 +23,7 @@ async def check_pwned_password(websession: aiohttp.ClientSession, sha1_pw: str)
if sha1_short in _CACHE: if sha1_short in _CACHE:
raise PwnedSecret() raise PwnedSecret()
_LOGGER.debug("Check pwned state of %s", sha1_short)
try: try:
async with websession.get( async with websession.get(
_API_CALL.format(hash=sha1_short), timeout=aiohttp.ClientTimeout(total=10) _API_CALL.format(hash=sha1_short), timeout=aiohttp.ClientTimeout(total=10)

View File

@ -27,6 +27,7 @@ from .const import (
ATTR_OTA, ATTR_OTA,
ATTR_PASSWORD, ATTR_PASSWORD,
ATTR_PORTS, ATTR_PORTS,
ATTR_PWNED,
ATTR_REGISTRIES, ATTR_REGISTRIES,
ATTR_SESSION, ATTR_SESSION,
ATTR_SUPERVISOR, ATTR_SUPERVISOR,
@ -150,8 +151,6 @@ SCHEMA_SUPERVISOR_CONFIG = vol.Schema(
vol.Optional(ATTR_DEBUG, default=False): vol.Boolean(), vol.Optional(ATTR_DEBUG, default=False): vol.Boolean(),
vol.Optional(ATTR_DEBUG_BLOCK, default=False): vol.Boolean(), vol.Optional(ATTR_DEBUG_BLOCK, default=False): vol.Boolean(),
vol.Optional(ATTR_DIAGNOSTICS, default=None): vol.Maybe(vol.Boolean()), vol.Optional(ATTR_DIAGNOSTICS, default=None): vol.Maybe(vol.Boolean()),
vol.Optional(ATTR_CONTENT_TRUST, default=True): vol.Boolean(),
vol.Optional(ATTR_FORCE_SECURITY, default=False): vol.Boolean(),
}, },
extra=vol.REMOVE_EXTRA, extra=vol.REMOVE_EXTRA,
) )
@ -185,3 +184,14 @@ SCHEMA_INGRESS_CONFIG = vol.Schema(
}, },
extra=vol.REMOVE_EXTRA, extra=vol.REMOVE_EXTRA,
) )
# pylint: disable=no-value-for-parameter
SCHEMA_SECURITY_CONFIG = vol.Schema(
{
vol.Optional(ATTR_CONTENT_TRUST, default=True): vol.Boolean(),
vol.Optional(ATTR_PWNED, default=True): vol.Boolean(),
vol.Optional(ATTR_FORCE_SECURITY, default=False): vol.Boolean(),
},
extra=vol.REMOVE_EXTRA,
)

View File

@ -0,0 +1,61 @@
"""Test API security layer."""
from aiohttp import web
import pytest
from supervisor.api import RestAPI
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
# pylint: disable=redefined-outer-name
@pytest.fixture
async def api_system(aiohttp_client, run_dir, coresys: CoreSys):
"""Fixture for RestAPI client."""
api = RestAPI(coresys)
api.webapp = web.Application()
await api.load()
api.webapp.middlewares.append(api.security.system_validation)
yield await aiohttp_client(api.webapp)
@pytest.mark.asyncio
async def test_api_security_system_initialize(api_system, coresys: CoreSys):
"""Test security."""
coresys.core.state = CoreState.INITIALIZE
resp = await api_system.get("/supervisor/ping")
result = await resp.json()
assert resp.status == 400
assert result["result"] == "error"
@pytest.mark.asyncio
async def test_api_security_system_setup(api_system, coresys: CoreSys):
"""Test security."""
coresys.core.state = CoreState.SETUP
resp = await api_system.get("/supervisor/ping")
result = await resp.json()
assert resp.status == 400
assert result["result"] == "error"
@pytest.mark.asyncio
async def test_api_security_system_running(api_system, coresys: CoreSys):
"""Test security."""
coresys.core.state = CoreState.RUNNING
resp = await api_system.get("/supervisor/ping")
assert resp.status == 200
@pytest.mark.asyncio
async def test_api_security_system_startup(api_system, coresys: CoreSys):
"""Test security."""
coresys.core.state = CoreState.STARTUP
resp = await api_system.get("/supervisor/ping")
assert resp.status == 200

View File

@ -1,61 +1,35 @@
"""Test API security layer.""" """Test Supervisor API."""
from aiohttp import web
import pytest import pytest
from supervisor.api import RestAPI
from supervisor.const import CoreState
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
# pylint: disable=redefined-outer-name
@pytest.mark.asyncio
async def test_api_security_options_force_security(api_client, coresys: CoreSys):
"""Test security options force security."""
assert not coresys.security.force
@pytest.fixture await api_client.post("/security/options", json={"force_security": True})
async def api_system(aiohttp_client, run_dir, coresys: CoreSys):
"""Fixture for RestAPI client."""
api = RestAPI(coresys)
api.webapp = web.Application()
await api.load()
api.webapp.middlewares.append(api.security.system_validation) assert coresys.security.force
yield await aiohttp_client(api.webapp)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_api_security_system_initialize(api_system, coresys: CoreSys): async def test_api_security_options_content_trust(api_client, coresys: CoreSys):
"""Test security.""" """Test security options content trust."""
coresys.core.state = CoreState.INITIALIZE assert coresys.security.content_trust
resp = await api_system.get("/supervisor/ping") await api_client.post("/security/options", json={"content_trust": False})
result = await resp.json()
assert resp.status == 400 assert not coresys.security.content_trust
assert result["result"] == "error"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_api_security_system_setup(api_system, coresys: CoreSys): async def test_api_security_options_pwned(api_client, coresys: CoreSys):
"""Test security.""" """Test security options pwned."""
coresys.core.state = CoreState.SETUP assert coresys.security.pwned
resp = await api_system.get("/supervisor/ping") await api_client.post("/security/options", json={"pwned": False})
result = await resp.json()
assert resp.status == 400
assert result["result"] == "error"
assert not coresys.security.pwned
@pytest.mark.asyncio
async def test_api_security_system_running(api_system, coresys: CoreSys):
"""Test security."""
coresys.core.state = CoreState.RUNNING
resp = await api_system.get("/supervisor/ping")
assert resp.status == 200
@pytest.mark.asyncio
async def test_api_security_system_startup(api_system, coresys: CoreSys):
"""Test security."""
coresys.core.state = CoreState.STARTUP
resp = await api_system.get("/supervisor/ping")
assert resp.status == 200

View File

@ -6,20 +6,10 @@ from supervisor.coresys import CoreSys
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_api_supervisor_options_force_security(api_client, coresys: CoreSys): async def test_api_supervisor_options_debug(api_client, coresys: CoreSys):
"""Test supervisor options force security.""" """Test security options force security."""
assert not coresys.config.force_security assert not coresys.config.debug
await api_client.post("/supervisor/options", json={"force_security": True}) await api_client.post("/supervisor/options", json={"debug": True})
assert coresys.config.force_security assert coresys.config.debug
@pytest.mark.asyncio
async def test_api_supervisor_options_content_trust(api_client, coresys: CoreSys):
"""Test supervisor options content trust."""
assert coresys.config.content_trust
await api_client.post("/supervisor/options", json={"content_trust": False})
assert not coresys.config.content_trust

View File

@ -35,29 +35,20 @@ async def test_check(coresys: CoreSys):
assert len(coresys.resolution.issues) == 0 assert len(coresys.resolution.issues) == 0
with patch( coresys.security.verify_secret = AsyncMock(side_effect=PwnedSecret)
"supervisor.resolution.checks.addon_pwned.check_pwned_password", await addon_pwned.run_check.__wrapped__(addon_pwned)
AsyncMock(side_effect=PwnedSecret()), assert not coresys.security.verify_secret.called
) as mock:
await addon_pwned.run_check.__wrapped__(addon_pwned)
assert not mock.called
addon.pwned.add("123456") addon.pwned.add("123456")
with patch( coresys.security.verify_secret = AsyncMock(return_value=None)
"supervisor.resolution.checks.addon_pwned.check_pwned_password", await addon_pwned.run_check.__wrapped__(addon_pwned)
AsyncMock(return_value=None), assert coresys.security.verify_secret.called
) as mock:
await addon_pwned.run_check.__wrapped__(addon_pwned)
assert mock.called
assert len(coresys.resolution.issues) == 0 assert len(coresys.resolution.issues) == 0
with patch( coresys.security.verify_secret = AsyncMock(side_effect=PwnedSecret)
"supervisor.resolution.checks.addon_pwned.check_pwned_password", await addon_pwned.run_check.__wrapped__(addon_pwned)
AsyncMock(side_effect=PwnedSecret()), assert coresys.security.verify_secret.called
) as mock:
await addon_pwned.run_check.__wrapped__(addon_pwned)
assert mock.called
assert len(coresys.resolution.issues) == 1 assert len(coresys.resolution.issues) == 1
assert coresys.resolution.issues[-1].type == IssueType.PWNED assert coresys.resolution.issues[-1].type == IssueType.PWNED
@ -75,24 +66,15 @@ async def test_approve(coresys: CoreSys):
coresys.addons.local[addon.slug] = addon coresys.addons.local[addon.slug] = addon
addon.pwned.add("123456") addon.pwned.add("123456")
with patch( coresys.security.verify_secret = AsyncMock(side_effect=PwnedSecret)
"supervisor.resolution.checks.addon_pwned.check_pwned_password", assert await addon_pwned.approve_check(reference=addon.slug)
AsyncMock(side_effect=PwnedSecret()),
):
assert await addon_pwned.approve_check(reference=addon.slug)
with patch( coresys.security.verify_secret = AsyncMock(return_value=None)
"supervisor.resolution.checks.addon_pwned.check_pwned_password", assert not await addon_pwned.approve_check(reference=addon.slug)
AsyncMock(return_value=None),
):
assert not await addon_pwned.approve_check(reference=addon.slug)
addon.is_installed = False addon.is_installed = False
with patch( coresys.security.verify_secret = AsyncMock(side_effect=PwnedSecret)
"supervisor.resolution.checks.addon_pwned.check_pwned_password", assert not await addon_pwned.approve_check(reference=addon.slug)
AsyncMock(side_effect=PwnedSecret()),
):
assert not await addon_pwned.approve_check(reference=addon.slug)
async def test_did_run(coresys: CoreSys): async def test_did_run(coresys: CoreSys):

View File

@ -15,7 +15,7 @@ async def test_evaluation(coresys: CoreSys):
await job_conditions() await job_conditions()
assert job_conditions.reason not in coresys.resolution.unsupported assert job_conditions.reason not in coresys.resolution.unsupported
coresys.config.content_trust = False coresys.security.content_trust = False
await job_conditions() await job_conditions()
assert job_conditions.reason in coresys.resolution.unsupported assert job_conditions.reason in coresys.resolution.unsupported

View File

@ -14,26 +14,17 @@ async def test_evaluation(coresys: CoreSys):
coresys.core.state = CoreState.RUNNING coresys.core.state = CoreState.RUNNING
assert sourcemods.reason not in coresys.resolution.unsupported assert sourcemods.reason not in coresys.resolution.unsupported
with patch( coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
"supervisor.resolution.evaluations.source_mods.EvaluateSourceMods.sys_verify_content", await sourcemods()
AsyncMock(side_effect=CodeNotaryUntrusted), assert sourcemods.reason in coresys.resolution.unsupported
):
await sourcemods()
assert sourcemods.reason in coresys.resolution.unsupported
with patch( coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryError)
"supervisor.resolution.evaluations.source_mods.EvaluateSourceMods.sys_verify_content", await sourcemods()
AsyncMock(side_effect=CodeNotaryError), assert sourcemods.reason not in coresys.resolution.unsupported
):
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
with patch( coresys.security.verify_own_content = AsyncMock()
"supervisor.resolution.evaluations.source_mods.EvaluateSourceMods.sys_verify_content", await sourcemods()
AsyncMock(), assert sourcemods.reason not in coresys.resolution.unsupported
):
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
async def test_did_run(coresys: CoreSys): async def test_did_run(coresys: CoreSys):

View File

@ -11,7 +11,7 @@ URL_TEST = "https://version.home-assistant.io/stable.json"
async def test_fetch_versions(coresys: CoreSys) -> None: async def test_fetch_versions(coresys: CoreSys) -> None:
"""Test download and sync version.""" """Test download and sync version."""
coresys.config.force_security = True coresys.security.force = True
await coresys.updater.fetch_data() await coresys.updater.fetch_data()
async with coresys.websession.get(URL_TEST) as request: async with coresys.websession.get(URL_TEST) as request: