diff --git a/supervisor/addons/addon.py b/supervisor/addons/addon.py index 3d6e14156..bd35baea4 100644 --- a/supervisor/addons/addon.py +++ b/supervisor/addons/addon.py @@ -736,7 +736,7 @@ class Addon(AddonModel): if self.sys_host.apparmor.exists(self.slug): profile = temp_path.joinpath("apparmor.txt") try: - self.sys_host.apparmor.backup_profile(self.slug, profile) + await self.sys_host.apparmor.backup_profile(self.slug, profile) except HostAppArmorError as err: raise AddonsError( "Can't backup AppArmor profile", _LOGGER.error diff --git a/supervisor/bootstrap.py b/supervisor/bootstrap.py index b31849234..40264ef00 100644 --- a/supervisor/bootstrap.py +++ b/supervisor/bootstrap.py @@ -174,13 +174,20 @@ def initialize_system(coresys: CoreSys) -> None: _LOGGER.debug("Creating Supervisor share folder at '%s'", config.path_share) config.path_share.mkdir() - # Apparmor folder + # Apparmor folders if not config.path_apparmor.is_dir(): _LOGGER.debug( - "Creating Supervisor Apparmor folder at '%s'", config.path_apparmor + "Creating Supervisor Apparmor Profile folder at '%s'", config.path_apparmor ) config.path_apparmor.mkdir() + if not config.path_apparmor_cache.is_dir(): + _LOGGER.debug( + "Creating Supervisor Apparmor Cache folder at '%s'", + config.path_apparmor_cache, + ) + config.path_apparmor_cache.mkdir() + # DNS folder if not config.path_dns.is_dir(): _LOGGER.debug("Creating Supervisor DNS folder at '%s'", config.path_dns) diff --git a/supervisor/config.py b/supervisor/config.py index 35c092f6c..ba693ec85 100644 --- a/supervisor/config.py +++ b/supervisor/config.py @@ -42,6 +42,7 @@ BACKUP_DATA = PurePath("backup") SHARE_DATA = PurePath("share") TMP_DATA = PurePath("tmp") APPARMOR_DATA = PurePath("apparmor") +APPARMOR_CACHE = PurePath("apparmor/cache") DNS_DATA = PurePath("dns") AUDIO_DATA = PurePath("audio") MEDIA_DATA = PurePath("media") @@ -268,6 +269,21 @@ class CoreConfig(FileConfiguration): """Return root Apparmor profile folder.""" return Path(SUPERVISOR_DATA, APPARMOR_DATA) + @property + def path_apparmor_cache(self) -> Path: + """Return root Apparmor cache folder.""" + return Path(SUPERVISOR_DATA, APPARMOR_CACHE) + + @property + def path_extern_apparmor(self) -> Path: + """Return root Apparmor profile folder external.""" + return Path(self.path_extern_supervisor, APPARMOR_DATA) + + @property + def path_extern_apparmor_cache(self) -> Path: + """Return root Apparmor cache folder external.""" + return Path(self.path_extern_supervisor, APPARMOR_CACHE) + @property def path_extern_share(self) -> PurePath: """Return root share data folder external for Docker.""" diff --git a/supervisor/docker/addon.py b/supervisor/docker/addon.py index 956a2a0c9..be13f5776 100644 --- a/supervisor/docker/addon.py +++ b/supervisor/docker/addon.py @@ -194,8 +194,10 @@ class DockerAddon(DockerInterface): security = super().security_opt # AppArmor - apparmor = self.sys_host.apparmor.available - if not apparmor or self.addon.apparmor == SECURITY_DISABLE: + if ( + not self.sys_host.apparmor.available + or self.addon.apparmor == SECURITY_DISABLE + ): security.append("apparmor=unconfined") elif self.addon.apparmor == SECURITY_PROFILE: security.append(f"apparmor={self.addon.slug}") diff --git a/supervisor/host/apparmor.py b/supervisor/host/apparmor.py index bc185b829..f09888a69 100644 --- a/supervisor/host/apparmor.py +++ b/supervisor/host/apparmor.py @@ -3,41 +3,37 @@ import logging from pathlib import Path import shutil -from ..coresys import CoreSysAttributes +from supervisor.resolution.const import UnsupportedReason + +from ..coresys import CoreSys, CoreSysAttributes from ..exceptions import DBusError, HostAppArmorError from ..utils.apparmor import validate_profile +from .const import HostFeature _LOGGER: logging.Logger = logging.getLogger(__name__) -SYSTEMD_SERVICES = {"hassos-apparmor.service", "hassio-apparmor.service"} - class AppArmorControl(CoreSysAttributes): """Handle host AppArmor controls.""" - def __init__(self, coresys): + def __init__(self, coresys: CoreSys): """Initialize host power handling.""" - self.coresys = coresys - self._profiles = set() - self._service = None + self.coresys: CoreSys = coresys + self._profiles: set[str] = set() @property - def available(self): + def available(self) -> bool: """Return True if AppArmor is available on host.""" - return self._service is not None + return ( + HostFeature.OS_AGENT in self.sys_host.features + and UnsupportedReason.APPARMOR not in self.sys_resolution.unsupported + ) - def exists(self, profile): + def exists(self, profile_name: str) -> bool: """Return True if a profile exists.""" - return profile in self._profiles + return profile_name in self._profiles - async def _reload_service(self): - """Reload internal service.""" - try: - await self.sys_host.services.reload(self._service) - except DBusError as err: - _LOGGER.error("Can't reload %s: %s", self._service, err) - - def _get_profile(self, profile_name): + def _get_profile(self, profile_name: str) -> Path: """Get a profile from AppArmor store.""" if profile_name not in self._profiles: raise HostAppArmorError( @@ -45,27 +41,26 @@ class AppArmorControl(CoreSysAttributes): ) return Path(self.sys_config.path_apparmor, profile_name) - async def load(self): + async def load(self) -> None: """Load available profiles.""" for content in self.sys_config.path_apparmor.iterdir(): if not content.is_file(): continue self._profiles.add(content.name) - # Is connected with systemd? _LOGGER.info("Loading AppArmor Profiles: %s", self._profiles) - for service in SYSTEMD_SERVICES: - if not self.sys_host.services.exists(service): - continue - self._service = service # Load profiles if self.available: - await self._reload_service() + for profile_name in self._profiles: + try: + await self._load_profile(profile_name) + except HostAppArmorError: + pass else: - _LOGGER.info("AppArmor is not enabled on host") + _LOGGER.warning("AppArmor is not enabled on host") - async def load_profile(self, profile_name, profile_file): + async def load_profile(self, profile_name: str, profile_file: Path) -> None: """Load/Update a new/exists profile into AppArmor.""" if not validate_profile(profile_name, profile_file): raise HostAppArmorError( @@ -73,9 +68,9 @@ class AppArmorControl(CoreSysAttributes): ) # Copy to AppArmor folder - dest_profile = Path(self.sys_config.path_apparmor, profile_name) + dest_profile: Path = Path(self.sys_config.path_apparmor, profile_name) try: - shutil.copyfile(profile_file, dest_profile) + await self.sys_run_in_executor(shutil.copyfile, profile_file, dest_profile) except OSError as err: raise HostAppArmorError( f"Can't copy {profile_file}: {err}", _LOGGER.error @@ -84,43 +79,60 @@ class AppArmorControl(CoreSysAttributes): # Load profiles _LOGGER.info("Adding/updating AppArmor profile: %s", profile_name) self._profiles.add(profile_name) - if self.available: - await self._reload_service() - - async def remove_profile(self, profile_name): - """Remove a AppArmor profile.""" - profile_file = self._get_profile(profile_name) - - # Only remove file if not self.available: - try: - profile_file.unlink() - except OSError as err: - raise HostAppArmorError( - f"Can't remove profile: {err}", _LOGGER.error - ) from err return - # Marks als remove and start host process - remove_profile = Path(self.sys_config.path_apparmor, "remove", profile_name) + await self._load_profile(profile_name) + + async def remove_profile(self, profile_name: str) -> None: + """Remove a AppArmor profile.""" + profile_file: Path = self._get_profile(profile_name) + + # Unload if apparmor is enabled + if self.available: + await self._unload_profile(profile_name) + try: - profile_file.rename(remove_profile) + await self.sys_run_in_executor(profile_file.unlink) except OSError as err: raise HostAppArmorError( - f"Can't mark profile as remove: {err}", _LOGGER.error + f"Can't remove profile: {err}", _LOGGER.error ) from err _LOGGER.info("Removing AppArmor profile: %s", profile_name) self._profiles.remove(profile_name) - await self._reload_service() - def backup_profile(self, profile_name, backup_file): + async def backup_profile(self, profile_name: str, backup_file: Path) -> None: """Backup A profile into a new file.""" - profile_file = self._get_profile(profile_name) + profile_file: Path = self._get_profile(profile_name) try: - shutil.copy(profile_file, backup_file) + await self.sys_run_in_executor(shutil.copy, profile_file, backup_file) except OSError as err: raise HostAppArmorError( f"Can't backup profile {profile_name}: {err}", _LOGGER.error ) from err + + async def _load_profile(self, profile_name: str) -> None: + """Load a profile on the host.""" + try: + await self.sys_dbus.agent.apparmor.load_profile( + self.sys_config.path_extern_apparmor.joinpath(profile_name), + self.sys_config.path_apparmor_cache, + ) + except DBusError as err: + raise HostAppArmorError( + f"Can't load profile {profile_name}: {err!s}", _LOGGER.error + ) from err + + async def _unload_profile(self, profile_name: str) -> None: + """Unload a profile on the host.""" + try: + await self.sys_dbus.agent.apparmor.unload_profile( + self.sys_config.path_extern_apparmor.joinpath(profile_name), + self.sys_config.path_apparmor_cache, + ) + except DBusError as err: + raise HostAppArmorError( + f"Can't unload profile {profile_name}: {err!s}", _LOGGER.error + ) from err