Add mypy to ci and precommit (#5969)

* Add mypy to ci and precommit

* Run precommit mypy in venv

* Fix issues raised in latest version of mypy
This commit is contained in:
Mike Degatano 2025-06-24 05:48:03 -04:00 committed by GitHub
parent 3f921e50b3
commit 3ee7c082ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 211 additions and 72 deletions

View File

@ -10,6 +10,7 @@ on:
env:
DEFAULT_PYTHON: "3.13"
PRE_COMMIT_CACHE: ~/.cache/pre-commit
MYPY_CACHE_VERSION: 1
concurrency:
group: "${{ github.workflow }}-${{ github.ref }}"
@ -286,6 +287,52 @@ jobs:
. venv/bin/activate
pylint supervisor tests
mypy:
name: Check mypy
runs-on: ubuntu-latest
needs: prepare
steps:
- name: Check out code from GitHub
uses: actions/checkout@v4.2.2
- name: Set up Python ${{ needs.prepare.outputs.python-version }}
uses: actions/setup-python@v5.6.0
id: python
with:
python-version: ${{ needs.prepare.outputs.python-version }}
- name: Generate partial mypy restore key
id: generate-mypy-key
run: |
mypy_version=$(cat requirements_test.txt | grep mypy | cut -d '=' -f 3)
echo "version=$mypy_version" >> $GITHUB_OUTPUT
echo "key=mypy-${{ env.MYPY_CACHE_VERSION }}-$mypy_version-$(date -u '+%Y-%m-%dT%H:%M:%s')" >> $GITHUB_OUTPUT
- name: Restore Python virtual environment
id: cache-venv
uses: actions/cache@v4.2.3
with:
path: venv
key: >-
${{ runner.os }}-venv-${{ needs.prepare.outputs.python-version }}-${{ hashFiles('requirements.txt') }}-${{ hashFiles('requirements_tests.txt') }}
- name: Fail job if Python cache restore failed
if: steps.cache-venv.outputs.cache-hit != 'true'
run: |
echo "Failed to restore Python virtual environment from cache"
exit 1
- name: Restore mypy cache
uses: actions/cache@v4.2.3
with:
path: .mypy_cache
key: >-
${{ runner.os }}-mypy-${{ needs.prepare.outputs.python-version }}-${{ steps.generate-mypy-key.outputs.key }}
restore-keys: >-
${{ runner.os }}-venv-${{ needs.prepare.outputs.python-version }}-mypy-${{ env.MYPY_CACHE_VERSION }}-${{ steps.generate-mypy-key.outputs.version }}
- name: Register mypy problem matcher
run: |
echo "::add-matcher::.github/workflows/matchers/mypy.json"
- name: Run mypy
run: |
. venv/bin/activate
mypy --ignore-missing-imports supervisor
pytest:
runs-on: ubuntu-latest
needs: prepare

16
.github/workflows/matchers/mypy.json vendored Normal file
View File

@ -0,0 +1,16 @@
{
"problemMatcher": [
{
"owner": "mypy",
"pattern": [
{
"regexp": "^(.+):(\\d+):\\s(error|warning):\\s(.+)$",
"file": 1,
"line": 2,
"severity": 3,
"message": 4
}
]
}
]
}

View File

@ -13,3 +13,15 @@ repos:
- id: check-executables-have-shebangs
stages: [manual]
- id: check-json
- repo: local
hooks:
# Run mypy through our wrapper script in order to get the possible
# pyenv and/or virtualenv activated; it may not have been e.g. if
# committing from a GUI tool that was not launched from an activated
# shell.
- id: mypy
name: mypy
entry: script/run-in-env.sh mypy --ignore-missing-imports
language: script
types_or: [python, pyi]
files: ^supervisor/.+\.(py|pyi)$

View File

@ -1,5 +1,6 @@
astroid==3.3.10
coverage==7.9.1
mypy==1.16.1
pre-commit==4.2.0
pylint==3.3.7
pytest-aiohttp==1.1.0
@ -9,4 +10,7 @@ pytest-timeout==2.4.0
pytest==8.4.1
ruff==0.12.0
time-machine==2.16.0
types-docker==7.1.0.20250523
types-pyyaml==6.0.12.20250516
types-requests==2.32.4.20250611
urllib3==2.5.0

30
script/run-in-env.sh Executable file
View File

@ -0,0 +1,30 @@
#!/usr/bin/env sh
set -eu
# Used in venv activate script.
# Would be an error if undefined.
OSTYPE="${OSTYPE-}"
# Activate pyenv and virtualenv if present, then run the specified command
# pyenv, pyenv-virtualenv
if [ -s .python-version ]; then
PYENV_VERSION=$(head -n 1 .python-version)
export PYENV_VERSION
fi
if [ -n "${VIRTUAL_ENV-}" ] && [ -f "${VIRTUAL_ENV}/bin/activate" ]; then
. "${VIRTUAL_ENV}/bin/activate"
else
# other common virtualenvs
my_path=$(git rev-parse --show-toplevel)
for venv in venv .venv .; do
if [ -f "${my_path}/${venv}/bin/activate" ]; then
. "${my_path}/${venv}/bin/activate"
break
fi
done
fi
exec "$@"

View File

@ -360,7 +360,7 @@ class Addon(AddonModel):
@property
def auto_update(self) -> bool:
"""Return if auto update is enable."""
return self.persist.get(ATTR_AUTO_UPDATE, super().auto_update)
return self.persist.get(ATTR_AUTO_UPDATE, False)
@auto_update.setter
def auto_update(self, value: bool) -> None:

View File

@ -664,12 +664,16 @@ class AddonModel(JobGroup, ABC):
"""Validate if addon is available for current system."""
return self._validate_availability(self.data, logger=_LOGGER.error)
def __eq__(self, other):
"""Compaired add-on objects."""
def __eq__(self, other: Any) -> bool:
"""Compare add-on objects."""
if not isinstance(other, AddonModel):
return False
return self.slug == other.slug
def __hash__(self) -> int:
"""Hash for add-on objects."""
return hash(self.slug)
def _validate_availability(
self, config, *, logger: Callable[..., None] | None = None
) -> None:

View File

@ -3,11 +3,13 @@
import asyncio
from collections.abc import Awaitable
import logging
from typing import Any
from typing import Any, cast
from aiohttp import BasicAuth, web
from aiohttp.hdrs import AUTHORIZATION, CONTENT_TYPE, WWW_AUTHENTICATE
from aiohttp.web import FileField
from aiohttp.web_exceptions import HTTPUnauthorized
from multidict import MultiDictProxy
import voluptuous as vol
from ..addons.addon import Addon
@ -51,7 +53,10 @@ class APIAuth(CoreSysAttributes):
return self.sys_auth.check_login(addon, auth.login, auth.password)
def _process_dict(
self, request: web.Request, addon: Addon, data: dict[str, str]
self,
request: web.Request,
addon: Addon,
data: dict[str, Any] | MultiDictProxy[str | bytes | FileField],
) -> Awaitable[bool]:
"""Process login with dict data.
@ -60,7 +65,15 @@ class APIAuth(CoreSysAttributes):
username = data.get("username") or data.get("user")
password = data.get("password")
return self.sys_auth.check_login(addon, username, password)
# Test that we did receive strings and not something else, raise if so
try:
_ = username.encode and password.encode # type: ignore
except AttributeError:
raise HTTPUnauthorized(headers=REALM_HEADER) from None
return self.sys_auth.check_login(
addon, cast(str, username), cast(str, password)
)
@api_process
async def auth(self, request: web.Request) -> bool:

View File

@ -587,7 +587,7 @@ class CoreSys:
return self._machine_id
@machine_id.setter
def machine_id(self, value: str) -> None:
def machine_id(self, value: str | None) -> None:
"""Set a machine-id type string."""
if self._machine_id:
raise RuntimeError("Machine-ID type already set!")

View File

@ -259,7 +259,7 @@ class NetworkManager(DBusInterfaceProxy):
else:
interface.primary = False
interfaces[interface.name] = interface
interfaces[interface.interface_name] = interface
interfaces[interface.hw_address] = interface
# Disconnect removed devices

View File

@ -49,7 +49,7 @@ class NetworkInterface(DBusInterfaceProxy):
@property
@dbus_property
def name(self) -> str:
def interface_name(self) -> str:
"""Return interface name."""
return self.properties[DBUS_ATTR_DEVICE_INTERFACE]

View File

@ -87,19 +87,19 @@ class HomeAssistantCore(JobGroup):
try:
# Evaluate Version if we lost this information
if not self.sys_homeassistant.version:
if self.sys_homeassistant.version:
version = self.sys_homeassistant.version
else:
self.sys_homeassistant.version = (
await self.instance.get_latest_version()
)
version
) = await self.instance.get_latest_version()
await self.instance.attach(
version=self.sys_homeassistant.version, skip_state_event_if_down=True
)
await self.instance.attach(version=version, skip_state_event_if_down=True)
# Ensure we are using correct image for this system (unless user has overridden it)
if not self.sys_homeassistant.override_image:
await self.instance.check_image(
self.sys_homeassistant.version, self.sys_homeassistant.default_image
version, self.sys_homeassistant.default_image
)
self.sys_homeassistant.set_image(self.sys_homeassistant.default_image)
except DockerError:
@ -108,7 +108,7 @@ class HomeAssistantCore(JobGroup):
)
await self.install_landingpage()
else:
self.sys_homeassistant.version = self.instance.version
self.sys_homeassistant.version = self.instance.version or version
self.sys_homeassistant.set_image(self.instance.image)
await self.sys_homeassistant.save_data()
@ -182,12 +182,13 @@ class HomeAssistantCore(JobGroup):
if not self.sys_homeassistant.latest_version:
await self.sys_updater.reload()
if self.sys_homeassistant.latest_version:
if to_version := self.sys_homeassistant.latest_version:
try:
await self.instance.update(
self.sys_homeassistant.latest_version,
to_version,
image=self.sys_updater.image_homeassistant,
)
self.sys_homeassistant.version = self.instance.version or to_version
break
except (DockerError, JobException):
pass
@ -198,7 +199,6 @@ class HomeAssistantCore(JobGroup):
await asyncio.sleep(30)
_LOGGER.info("Home Assistant docker now installed")
self.sys_homeassistant.version = self.instance.version
self.sys_homeassistant.set_image(self.sys_updater.image_homeassistant)
await self.sys_homeassistant.save_data()
@ -231,8 +231,8 @@ class HomeAssistantCore(JobGroup):
backup: bool | None = False,
) -> None:
"""Update HomeAssistant version."""
version = version or self.sys_homeassistant.latest_version
if not version:
to_version = version or self.sys_homeassistant.latest_version
if not to_version:
raise HomeAssistantUpdateError(
"Cannot determine latest version of Home Assistant for update",
_LOGGER.error,
@ -243,9 +243,9 @@ class HomeAssistantCore(JobGroup):
running = await self.instance.is_running()
exists = await self.instance.exists()
if exists and version == self.instance.version:
if exists and to_version == self.instance.version:
raise HomeAssistantUpdateError(
f"Version {version!s} is already installed", _LOGGER.warning
f"Version {to_version!s} is already installed", _LOGGER.warning
)
if backup:
@ -268,7 +268,7 @@ class HomeAssistantCore(JobGroup):
"Updating Home Assistant image failed", _LOGGER.warning
) from err
self.sys_homeassistant.version = self.instance.version
self.sys_homeassistant.version = self.instance.version or to_version
self.sys_homeassistant.set_image(self.sys_updater.image_homeassistant)
if running:
@ -282,7 +282,7 @@ class HomeAssistantCore(JobGroup):
# Update Home Assistant
with suppress(HomeAssistantError):
await _update(version)
await _update(to_version)
if not self.error_state and rollback:
try:

View File

@ -175,7 +175,7 @@ class Interface:
)
return Interface(
name=inet.name,
name=inet.interface_name,
mac=inet.hw_address,
path=inet.path,
enabled=inet.settings is not None,
@ -286,7 +286,7 @@ class Interface:
_LOGGER.warning(
"Auth method %s for network interface %s unsupported, skipping",
inet.settings.wireless_security.key_mgmt,
inet.name,
inet.interface_name,
)
return None

View File

@ -22,6 +22,7 @@ from ..exceptions import (
AudioUpdateError,
ConfigurationFileError,
DockerError,
PluginError,
)
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
@ -127,7 +128,7 @@ class PluginAudio(PluginBase):
"""Update Audio plugin."""
try:
await super().update(version)
except DockerError as err:
except (DockerError, PluginError) as err:
raise AudioUpdateError("Audio update failed", _LOGGER.error) from err
async def restart(self) -> None:

View File

@ -168,14 +168,14 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
# Check plugin state
try:
# Evaluate Version if we lost this information
if not self.version:
self.version = await self.instance.get_latest_version()
if self.version:
version = self.version
else:
self.version = version = await self.instance.get_latest_version()
await self.instance.attach(
version=self.version, skip_state_event_if_down=True
)
await self.instance.attach(version=version, skip_state_event_if_down=True)
await self.instance.check_image(self.version, self.default_image)
await self.instance.check_image(version, self.default_image)
except DockerError:
_LOGGER.info(
"No %s plugin Docker image %s found.", self.slug, self.instance.image
@ -185,7 +185,7 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
with suppress(PluginError):
await self.install()
else:
self.version = self.instance.version
self.version = self.instance.version or version
self.image = self.default_image
await self.save_data()
@ -202,11 +202,10 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
if not self.latest_version:
await self.sys_updater.reload()
if self.latest_version:
if to_version := self.latest_version:
with suppress(DockerError):
await self.instance.install(
self.latest_version, image=self.default_image
)
await self.instance.install(to_version, image=self.default_image)
self.version = self.instance.version or to_version
break
_LOGGER.warning(
"Error on installing %s plugin, retrying in 30sec", self.slug
@ -214,23 +213,28 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
await asyncio.sleep(30)
_LOGGER.info("%s plugin now installed", self.slug)
self.version = self.instance.version
self.image = self.default_image
await self.save_data()
async def update(self, version: str | None = None) -> None:
"""Update system plugin."""
version = version or self.latest_version
to_version = AwesomeVersion(version) if version else self.latest_version
if not to_version:
raise PluginError(
f"Cannot determine latest version of plugin {self.slug} for update",
_LOGGER.error,
)
old_image = self.image
if version == self.version:
if to_version == self.version:
_LOGGER.warning(
"Version %s is already installed for %s", version, self.slug
"Version %s is already installed for %s", to_version, self.slug
)
return
await self.instance.update(version, image=self.default_image)
self.version = self.instance.version
await self.instance.update(to_version, image=self.default_image)
self.version = self.instance.version or to_version
self.image = self.default_image
await self.save_data()

View File

@ -15,7 +15,7 @@ from ..coresys import CoreSys
from ..docker.cli import DockerCli
from ..docker.const import ContainerState
from ..docker.stats import DockerStats
from ..exceptions import CliError, CliJobError, CliUpdateError, DockerError
from ..exceptions import CliError, CliJobError, CliUpdateError, DockerError, PluginError
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
from ..utils.sentry import async_capture_exception
@ -67,7 +67,7 @@ class PluginCli(PluginBase):
"""Update local HA cli."""
try:
await super().update(version)
except DockerError as err:
except (DockerError, PluginError) as err:
raise CliUpdateError("CLI update failed", _LOGGER.error) from err
async def start(self) -> None:

View File

@ -28,6 +28,7 @@ from ..exceptions import (
CoreDNSJobError,
CoreDNSUpdateError,
DockerError,
PluginError,
)
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
@ -217,7 +218,7 @@ class PluginDns(PluginBase):
"""Update CoreDNS plugin."""
try:
await super().update(version)
except DockerError as err:
except (DockerError, PluginError) as err:
raise CoreDNSUpdateError("CoreDNS update failed", _LOGGER.error) from err
async def restart(self) -> None:

View File

@ -16,6 +16,7 @@ from ..exceptions import (
MulticastError,
MulticastJobError,
MulticastUpdateError,
PluginError,
)
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
@ -63,7 +64,7 @@ class PluginMulticast(PluginBase):
"""Update Multicast plugin."""
try:
await super().update(version)
except DockerError as err:
except (DockerError, PluginError) as err:
raise MulticastUpdateError(
"Multicast update failed", _LOGGER.error
) from err

View File

@ -20,6 +20,7 @@ from ..exceptions import (
ObserverError,
ObserverJobError,
ObserverUpdateError,
PluginError,
)
from ..jobs.const import JobExecutionLimit
from ..jobs.decorator import Job
@ -72,7 +73,7 @@ class PluginObserver(PluginBase):
"""Update local HA observer."""
try:
await super().update(version)
except DockerError as err:
except (DockerError, PluginError) as err:
raise ObserverUpdateError(
"HA observer update failed", _LOGGER.error
) from err

View File

@ -19,12 +19,12 @@ class CheckNetworkInterfaceIPV4(CheckBase):
async def run_check(self) -> None:
"""Run check if not affected by issue."""
for interface in self.sys_dbus.network.interfaces:
if CheckNetworkInterfaceIPV4.check_interface(interface):
for inet in self.sys_dbus.network.interfaces:
if CheckNetworkInterfaceIPV4.check_interface(inet):
self.sys_resolution.create_issue(
IssueType.IPV4_CONNECTION_PROBLEM,
ContextType.SYSTEM,
interface.name,
inet.interface_name,
)
async def approve_check(self, reference: str | None = None) -> bool:

View File

@ -204,6 +204,12 @@ class Supervisor(CoreSysAttributes):
f"Version {version!s} is already installed", _LOGGER.warning
)
image = self.sys_updater.image_supervisor or self.instance.image
if not image:
raise SupervisorUpdateError(
"Cannot determine image to use for supervisor update!", _LOGGER.error
)
# First update own AppArmor
try:
await self.update_apparmor()
@ -216,12 +222,8 @@ class Supervisor(CoreSysAttributes):
# Update container
_LOGGER.info("Update Supervisor to version %s", version)
try:
await self.instance.install(
version, image=self.sys_updater.image_supervisor
)
await self.instance.update_start_tag(
self.sys_updater.image_supervisor, version
)
await self.instance.install(version, image=image)
await self.instance.update_start_tag(image, version)
except DockerError as err:
self.sys_resolution.create_issue(
IssueType.UPDATE_FAILED, ContextType.SUPERVISOR
@ -232,7 +234,7 @@ class Supervisor(CoreSysAttributes):
) from err
self.sys_config.version = version
self.sys_config.image = self.sys_updater.image_supervisor
self.sys_config.image = image
await self.sys_config.save_data()
self.sys_create_task(self.sys_core.stop())

View File

@ -137,25 +137,24 @@ async def test_auth_json_success(
@pytest.mark.parametrize(
("user", "password", "message", "api_client"),
("user", "password", "api_client"),
[
(None, "password", "None as username is not supported!", TEST_ADDON_SLUG),
("user", None, "None as password is not supported!", TEST_ADDON_SLUG),
(None, "password", TEST_ADDON_SLUG),
("user", None, TEST_ADDON_SLUG),
],
indirect=["api_client"],
)
async def test_auth_json_failure_none(
api_client: TestClient,
mock_check_login: AsyncMock,
install_addon_ssh: Addon,
user: str | None,
password: str | None,
message: str,
):
"""Test failed JSON auth with none user or password."""
mock_check_login.return_value = True
resp = await api_client.post("/auth", json={"username": user, "password": password})
assert resp.status == 400
body = await resp.json()
assert body["message"] == message
assert resp.status == 401
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
@ -177,7 +176,7 @@ async def test_auth_json_empty_body(api_client: TestClient, install_addon_ssh: A
resp = await api_client.post(
"/auth", data="", headers={"Content-Type": "application/json"}
)
assert resp.status == 400
assert resp.status == 401
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)

View File

@ -55,13 +55,13 @@ async def test_network_interface_ethernet(
interface = NetworkInterface("/org/freedesktop/NetworkManager/Devices/1")
assert interface.sync_properties is False
assert interface.name is None
assert interface.interface_name is None
assert interface.type is None
await interface.connect(dbus_session_bus)
assert interface.sync_properties is True
assert interface.name == TEST_INTERFACE_ETH_NAME
assert interface.interface_name == TEST_INTERFACE_ETH_NAME
assert interface.type == DeviceType.ETHERNET
assert interface.managed is True
assert interface.wireless is None
@ -108,7 +108,7 @@ async def test_network_interface_wlan(
await interface.connect(dbus_session_bus)
assert interface.sync_properties is True
assert interface.name == TEST_INTERFACE_WLAN_NAME
assert interface.interface_name == TEST_INTERFACE_WLAN_NAME
assert interface.type == DeviceType.WIRELESS
assert interface.wireless is not None
assert interface.wireless.bitrate == 0

View File

@ -76,6 +76,10 @@ async def test_connectivity_check_throttling(
async def test_update_failed(coresys: CoreSys, capture_exception: Mock):
"""Test update failure."""
# pylint: disable-next=protected-access
coresys.updater._data.setdefault("image", {})["supervisor"] = (
"ghcr.io/home-assistant/aarch64-hassio-supervisor"
)
err = DockerError()
with (
patch.object(DockerSupervisor, "install", side_effect=err),