Files
supervisor/supervisor/host/apparmor.py
Mike Degatano 0636e49fe2 Enable mypy part 1 (addons and api) (#5759)
* Fix mypy issues in addons

* Fix mypy issues in api

* fix docstring

* Brackets instead of get with default
2025-03-25 15:06:35 -04:00

168 lines
5.9 KiB
Python

"""AppArmor control for host."""
from __future__ import annotations
from contextlib import suppress
import errno
import logging
from pathlib import Path
import shutil
from awesomeversion import AwesomeVersion
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import DBusError, HostAppArmorError
from ..resolution.const import UnhealthyReason, UnsupportedReason
from ..utils.apparmor import validate_profile
from .const import HostFeature
_LOGGER: logging.Logger = logging.getLogger(__name__)
class AppArmorControl(CoreSysAttributes):
"""Handle host AppArmor controls."""
def __init__(self, coresys: CoreSys):
"""Initialize host power handling."""
self.coresys: CoreSys = coresys
self._profiles: set[str] = set()
@property
def available(self) -> bool:
"""Return True if AppArmor is available on host."""
return (
HostFeature.OS_AGENT in self.sys_host.features
and UnsupportedReason.APPARMOR not in self.sys_resolution.unsupported
)
@property
def version(self) -> AwesomeVersion | None:
"""Return hosts AppArmor Version."""
return self.sys_dbus.agent.apparmor.version
def exists(self, profile_name: str) -> bool:
"""Return True if a profile exists."""
return profile_name in self._profiles
def _get_profile(self, profile_name: str) -> Path:
"""Get a profile from AppArmor store."""
if profile_name not in self._profiles:
raise HostAppArmorError(
f"Can't find {profile_name} for removing", _LOGGER.error
)
return Path(self.sys_config.path_apparmor, profile_name)
async def load(self) -> None:
"""Load available profiles."""
def find_profiles() -> set[str]:
profiles: set[str] = set()
for content in self.sys_config.path_apparmor.iterdir():
if not content.is_file():
continue
profiles.add(content.name)
return profiles
self._profiles = await self.sys_run_in_executor(find_profiles)
_LOGGER.info("Loading AppArmor Profiles: %s", self._profiles)
# Load profiles
if self.available:
for profile_name in self._profiles:
with suppress(HostAppArmorError):
await self._load_profile(profile_name)
else:
_LOGGER.warning("AppArmor is not enabled on host")
async def load_profile(self, profile_name: str, profile_file: Path) -> None:
"""Load/Update a new/exists profile into AppArmor."""
if not await self.sys_run_in_executor(
validate_profile, profile_name, profile_file
):
raise HostAppArmorError(
f"AppArmor profile '{profile_name}' is not valid", _LOGGER.error
)
# Copy to AppArmor folder
dest_profile: Path = Path(self.sys_config.path_apparmor, profile_name)
try:
await self.sys_run_in_executor(shutil.copyfile, profile_file, dest_profile)
except OSError as err:
if err.errno == errno.EBADMSG:
self.sys_resolution.add_unhealthy_reason(
UnhealthyReason.OSERROR_BAD_MESSAGE
)
raise HostAppArmorError(
f"Can't copy {profile_file}: {err}", _LOGGER.error
) from err
# Load profiles
_LOGGER.info("Adding/updating AppArmor profile: %s", profile_name)
self._profiles.add(profile_name)
if not self.available:
return
await self._load_profile(profile_name)
async def remove_profile(self, profile_name: str) -> None:
"""Remove a AppArmor profile."""
profile_file: Path = self._get_profile(profile_name)
# Unload if apparmor is enabled
if self.available:
await self._unload_profile(profile_name)
try:
await self.sys_run_in_executor(profile_file.unlink)
except OSError as err:
if err.errno == errno.EBADMSG:
self.sys_resolution.add_unhealthy_reason(
UnhealthyReason.OSERROR_BAD_MESSAGE
)
raise HostAppArmorError(
f"Can't remove profile: {err}", _LOGGER.error
) from err
_LOGGER.info("Removing AppArmor profile: %s", profile_name)
self._profiles.remove(profile_name)
def backup_profile(self, profile_name: str, backup_file: Path) -> None:
"""Backup A profile into a new file."""
profile_file: Path = self._get_profile(profile_name)
try:
shutil.copy(profile_file, backup_file)
except OSError as err:
if err.errno == errno.EBADMSG:
self.sys_resolution.add_unhealthy_reason(
UnhealthyReason.OSERROR_BAD_MESSAGE
)
raise HostAppArmorError(
f"Can't backup profile {profile_name}: {err}", _LOGGER.error
) from err
async def _load_profile(self, profile_name: str) -> None:
"""Load a profile on the host."""
try:
await self.sys_dbus.agent.apparmor.load_profile(
self.sys_config.path_extern_apparmor.joinpath(profile_name),
self.sys_config.path_extern_apparmor_cache,
)
except DBusError as err:
raise HostAppArmorError(
f"Can't load profile {profile_name}: {err!s}", _LOGGER.error
) from err
async def _unload_profile(self, profile_name: str) -> None:
"""Unload a profile on the host."""
try:
await self.sys_dbus.agent.apparmor.unload_profile(
self.sys_config.path_extern_apparmor.joinpath(profile_name),
self.sys_config.path_extern_apparmor_cache,
)
except DBusError as err:
raise HostAppArmorError(
f"Can't unload profile {profile_name}: {err!s}", _LOGGER.error
) from err