Using AppArmor from OS-Agent (#3254)

* Using AppArmor from OS-Agent

* cleanup
This commit is contained in:
Pascal Vizeli 2021-10-27 12:55:31 +02:00 committed by GitHub
parent 8653f7a0e1
commit d80d76a24d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 94 additions and 57 deletions

View File

@ -736,7 +736,7 @@ class Addon(AddonModel):
if self.sys_host.apparmor.exists(self.slug):
profile = temp_path.joinpath("apparmor.txt")
try:
self.sys_host.apparmor.backup_profile(self.slug, profile)
await self.sys_host.apparmor.backup_profile(self.slug, profile)
except HostAppArmorError as err:
raise AddonsError(
"Can't backup AppArmor profile", _LOGGER.error

View File

@ -174,13 +174,20 @@ def initialize_system(coresys: CoreSys) -> None:
_LOGGER.debug("Creating Supervisor share folder at '%s'", config.path_share)
config.path_share.mkdir()
# Apparmor folder
# Apparmor folders
if not config.path_apparmor.is_dir():
_LOGGER.debug(
"Creating Supervisor Apparmor folder at '%s'", config.path_apparmor
"Creating Supervisor Apparmor Profile folder at '%s'", config.path_apparmor
)
config.path_apparmor.mkdir()
if not config.path_apparmor_cache.is_dir():
_LOGGER.debug(
"Creating Supervisor Apparmor Cache folder at '%s'",
config.path_apparmor_cache,
)
config.path_apparmor_cache.mkdir()
# DNS folder
if not config.path_dns.is_dir():
_LOGGER.debug("Creating Supervisor DNS folder at '%s'", config.path_dns)

View File

@ -42,6 +42,7 @@ BACKUP_DATA = PurePath("backup")
SHARE_DATA = PurePath("share")
TMP_DATA = PurePath("tmp")
APPARMOR_DATA = PurePath("apparmor")
APPARMOR_CACHE = PurePath("apparmor/cache")
DNS_DATA = PurePath("dns")
AUDIO_DATA = PurePath("audio")
MEDIA_DATA = PurePath("media")
@ -268,6 +269,21 @@ class CoreConfig(FileConfiguration):
"""Return root Apparmor profile folder."""
return Path(SUPERVISOR_DATA, APPARMOR_DATA)
@property
def path_apparmor_cache(self) -> Path:
"""Return root Apparmor cache folder."""
return Path(SUPERVISOR_DATA, APPARMOR_CACHE)
@property
def path_extern_apparmor(self) -> Path:
"""Return root Apparmor profile folder external."""
return Path(self.path_extern_supervisor, APPARMOR_DATA)
@property
def path_extern_apparmor_cache(self) -> Path:
"""Return root Apparmor cache folder external."""
return Path(self.path_extern_supervisor, APPARMOR_CACHE)
@property
def path_extern_share(self) -> PurePath:
"""Return root share data folder external for Docker."""

View File

@ -194,8 +194,10 @@ class DockerAddon(DockerInterface):
security = super().security_opt
# AppArmor
apparmor = self.sys_host.apparmor.available
if not apparmor or self.addon.apparmor == SECURITY_DISABLE:
if (
not self.sys_host.apparmor.available
or self.addon.apparmor == SECURITY_DISABLE
):
security.append("apparmor=unconfined")
elif self.addon.apparmor == SECURITY_PROFILE:
security.append(f"apparmor={self.addon.slug}")

View File

@ -3,41 +3,37 @@ import logging
from pathlib import Path
import shutil
from ..coresys import CoreSysAttributes
from supervisor.resolution.const import UnsupportedReason
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import DBusError, HostAppArmorError
from ..utils.apparmor import validate_profile
from .const import HostFeature
_LOGGER: logging.Logger = logging.getLogger(__name__)
SYSTEMD_SERVICES = {"hassos-apparmor.service", "hassio-apparmor.service"}
class AppArmorControl(CoreSysAttributes):
"""Handle host AppArmor controls."""
def __init__(self, coresys):
def __init__(self, coresys: CoreSys):
"""Initialize host power handling."""
self.coresys = coresys
self._profiles = set()
self._service = None
self.coresys: CoreSys = coresys
self._profiles: set[str] = set()
@property
def available(self):
def available(self) -> bool:
"""Return True if AppArmor is available on host."""
return self._service is not None
return (
HostFeature.OS_AGENT in self.sys_host.features
and UnsupportedReason.APPARMOR not in self.sys_resolution.unsupported
)
def exists(self, profile):
def exists(self, profile_name: str) -> bool:
"""Return True if a profile exists."""
return profile in self._profiles
return profile_name in self._profiles
async def _reload_service(self):
"""Reload internal service."""
try:
await self.sys_host.services.reload(self._service)
except DBusError as err:
_LOGGER.error("Can't reload %s: %s", self._service, err)
def _get_profile(self, profile_name):
def _get_profile(self, profile_name: str) -> Path:
"""Get a profile from AppArmor store."""
if profile_name not in self._profiles:
raise HostAppArmorError(
@ -45,27 +41,26 @@ class AppArmorControl(CoreSysAttributes):
)
return Path(self.sys_config.path_apparmor, profile_name)
async def load(self):
async def load(self) -> None:
"""Load available profiles."""
for content in self.sys_config.path_apparmor.iterdir():
if not content.is_file():
continue
self._profiles.add(content.name)
# Is connected with systemd?
_LOGGER.info("Loading AppArmor Profiles: %s", self._profiles)
for service in SYSTEMD_SERVICES:
if not self.sys_host.services.exists(service):
continue
self._service = service
# Load profiles
if self.available:
await self._reload_service()
for profile_name in self._profiles:
try:
await self._load_profile(profile_name)
except HostAppArmorError:
pass
else:
_LOGGER.info("AppArmor is not enabled on host")
_LOGGER.warning("AppArmor is not enabled on host")
async def load_profile(self, profile_name, profile_file):
async def load_profile(self, profile_name: str, profile_file: Path) -> None:
"""Load/Update a new/exists profile into AppArmor."""
if not validate_profile(profile_name, profile_file):
raise HostAppArmorError(
@ -73,9 +68,9 @@ class AppArmorControl(CoreSysAttributes):
)
# Copy to AppArmor folder
dest_profile = Path(self.sys_config.path_apparmor, profile_name)
dest_profile: Path = Path(self.sys_config.path_apparmor, profile_name)
try:
shutil.copyfile(profile_file, dest_profile)
await self.sys_run_in_executor(shutil.copyfile, profile_file, dest_profile)
except OSError as err:
raise HostAppArmorError(
f"Can't copy {profile_file}: {err}", _LOGGER.error
@ -84,43 +79,60 @@ class AppArmorControl(CoreSysAttributes):
# Load profiles
_LOGGER.info("Adding/updating AppArmor profile: %s", profile_name)
self._profiles.add(profile_name)
if self.available:
await self._reload_service()
async def remove_profile(self, profile_name):
"""Remove a AppArmor profile."""
profile_file = self._get_profile(profile_name)
# Only remove file
if not self.available:
return
await self._load_profile(profile_name)
async def remove_profile(self, profile_name: str) -> None:
"""Remove a AppArmor profile."""
profile_file: Path = self._get_profile(profile_name)
# Unload if apparmor is enabled
if self.available:
await self._unload_profile(profile_name)
try:
profile_file.unlink()
await self.sys_run_in_executor(profile_file.unlink)
except OSError as err:
raise HostAppArmorError(
f"Can't remove profile: {err}", _LOGGER.error
) from err
return
# Marks als remove and start host process
remove_profile = Path(self.sys_config.path_apparmor, "remove", profile_name)
try:
profile_file.rename(remove_profile)
except OSError as err:
raise HostAppArmorError(
f"Can't mark profile as remove: {err}", _LOGGER.error
) from err
_LOGGER.info("Removing AppArmor profile: %s", profile_name)
self._profiles.remove(profile_name)
await self._reload_service()
def backup_profile(self, profile_name, backup_file):
async def backup_profile(self, profile_name: str, backup_file: Path) -> None:
"""Backup A profile into a new file."""
profile_file = self._get_profile(profile_name)
profile_file: Path = self._get_profile(profile_name)
try:
shutil.copy(profile_file, backup_file)
await self.sys_run_in_executor(shutil.copy, profile_file, backup_file)
except OSError as err:
raise HostAppArmorError(
f"Can't backup profile {profile_name}: {err}", _LOGGER.error
) from err
async def _load_profile(self, profile_name: str) -> None:
"""Load a profile on the host."""
try:
await self.sys_dbus.agent.apparmor.load_profile(
self.sys_config.path_extern_apparmor.joinpath(profile_name),
self.sys_config.path_apparmor_cache,
)
except DBusError as err:
raise HostAppArmorError(
f"Can't load profile {profile_name}: {err!s}", _LOGGER.error
) from err
async def _unload_profile(self, profile_name: str) -> None:
"""Unload a profile on the host."""
try:
await self.sys_dbus.agent.apparmor.unload_profile(
self.sys_config.path_extern_apparmor.joinpath(profile_name),
self.sys_config.path_apparmor_cache,
)
except DBusError as err:
raise HostAppArmorError(
f"Can't unload profile {profile_name}: {err!s}", _LOGGER.error
) from err