Add core version check (#2436)

* Add core version check

* Fix version handling, rename and use correct paths

* simplify check

* wrap approve check
This commit is contained in:
Joakim Sørensen 2021-01-15 21:13:42 +01:00 committed by GitHub
parent e1068997ea
commit 88eb9511bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 164 additions and 2 deletions

View File

@ -4,6 +4,7 @@ from typing import List
from ..coresys import CoreSys, CoreSysAttributes
from .checks.base import CheckBase
from .checks.core_security import CheckCoreSecurity
from .checks.free_space import CheckFreeSpace
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -16,12 +17,13 @@ class ResolutionCheck(CoreSysAttributes):
"""Initialize the checks class."""
self.coresys = coresys
self._core_security = CheckCoreSecurity(coresys)
self._free_space = CheckFreeSpace(coresys)
@property
def all_tests(self) -> List[CheckBase]:
"""Return all list of all checks."""
return [self._free_space]
return [self._core_security, self._free_space]
async def check_system(self) -> None:
"""Check the system."""

View File

@ -0,0 +1,62 @@
"""Helpers to check core security."""
from enum import Enum
from pathlib import Path
from typing import List
from awesomeversion import AwesomeVersion, AwesomeVersionException
from ...const import CoreState
from ..const import ContextType, IssueType, SuggestionType
from .base import CheckBase
class SecurityReference(str, Enum):
"""Version references."""
CUSTOM_COMPONENTS_BELOW_2021_1_3 = "custom_components_below_2021_1_3"
class CheckCoreSecurity(CheckBase):
"""CheckCoreSecurity class for check."""
async def run_check(self) -> None:
"""Run check if not affected by issue."""
try:
if self.sys_homeassistant.version < AwesomeVersion("2021.1.3"):
if Path(
self.sys_config.path_homeassistant, "custom_components"
).exists():
self.sys_resolution.create_issue(
IssueType.SECURITY,
ContextType.CORE,
reference=SecurityReference.CUSTOM_COMPONENTS_BELOW_2021_1_3,
suggestions=[SuggestionType.EXECUTE_UPDATE],
)
except AwesomeVersionException:
return
async def approve_check(self) -> bool:
"""Approve check if it is affected by issue."""
try:
if self.sys_homeassistant.version >= AwesomeVersion("2021.1.3"):
return False
except AwesomeVersionException:
return True
if not Path(self.sys_config.path_homeassistant, "custom_components").exists():
return False
return True
@property
def issue(self) -> IssueType:
"""Return a IssueType enum."""
return IssueType.SECURITY
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.CORE
@property
def states(self) -> List[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@ -50,6 +50,7 @@ class IssueType(str, Enum):
DOCKER_RATELIMIT = "docker_ratelimit"
CORRUPT_DOCKER = "corrupt_docker"
CORRUPT_REPOSITORY = "corrupt_repository"
SECURITY = "security"
MISSING_IMAGE = "missing_image"
UPDATE_FAILED = "update_failed"
UPDATE_ROLLBACK = "update_rollback"

View File

@ -8,7 +8,8 @@ import logging
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import HomeAssistantAPIError
from .const import IssueType
from .checks.core_security import SecurityReference
from .const import ContextType, IssueType
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -39,6 +40,18 @@ class ResolutionNotify(CoreSysAttributes):
"notification_id": "supervisor_issue_free_space",
}
)
if issue.type == IssueType.SECURITY and issue.context == ContextType.CORE:
if (
issue.reference
== SecurityReference.CUSTOM_COMPONENTS_BELOW_2021_1_3
):
messages.append(
{
"title": "Security notification",
"message": "The Supervisor detected that this version of Home Assistant is insecure. [Update as soon as possible.](/hassio/dashboard)\n\nFor more information see the [Security bulletin](https://www.home-assistant.io/blog/2021/01/14/security-bulletin/).",
"notification_id": "supervisor_update_home_assistant_2021_1_3",
}
)
for message in messages:
try:

View File

@ -0,0 +1,84 @@
"""Test core version check."""
# pylint: disable=import-error,protected-access
from pathlib import Path
from unittest.mock import patch
from awesomeversion import AwesomeVersion
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.resolution.checks.core_security import CheckCoreSecurity
from supervisor.resolution.const import IssueType
async def test_check(coresys: CoreSys, tmp_path):
"""Test check."""
with patch("supervisor.config.CoreConfig.path_homeassistant", tmp_path):
core_security = CheckCoreSecurity(coresys)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
coresys.homeassistant._data["version"] = AwesomeVersion("2021.12.1")
await core_security.run_check()
assert len(coresys.resolution.issues) == 0
coresys.homeassistant._data["version"] = AwesomeVersion("landingpage")
await core_security.run_check()
assert len(coresys.resolution.issues) == 0
coresys.homeassistant._data["version"] = AwesomeVersion(None)
await core_security.run_check()
assert len(coresys.resolution.issues) == 0
coresys.homeassistant._data["version"] = AwesomeVersion("2021.1.2")
await core_security.run_check()
assert len(coresys.resolution.issues) == 0
Path(coresys.config.path_homeassistant, "custom_components").mkdir(parents=True)
await core_security.run_check()
assert coresys.resolution.issues[-1].type == IssueType.SECURITY
async def test_approve(coresys: CoreSys, tmp_path):
"""Test check."""
with patch("supervisor.config.CoreConfig.path_homeassistant", tmp_path):
core_security = CheckCoreSecurity(coresys)
coresys.core.state = CoreState.RUNNING
coresys.homeassistant._data["version"] = None
assert await core_security.approve_check()
coresys.homeassistant._data["version"] = AwesomeVersion("2021.1.3")
assert not await core_security.approve_check()
coresys.homeassistant._data["version"] = AwesomeVersion("2021.1.2")
assert not await core_security.approve_check()
Path(coresys.config.path_homeassistant, "custom_components").mkdir(parents=True)
assert await core_security.approve_check()
async def test_did_run(coresys: CoreSys):
"""Test that the check ran as expected."""
core_security = CheckCoreSecurity(coresys)
should_run = core_security.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.checks.core_security.CheckCoreSecurity.run_check",
return_value=None,
) as check:
for state in should_run:
coresys.core.state = state
await core_security()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
coresys.core.state = state
await core_security()
check.assert_not_called()
check.reset_mock()