mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-07-19 07:06:30 +00:00
Improve error handling when mounts fail (#4872)
This commit is contained in:
parent
7fd6dce55f
commit
4c573991d2
@ -15,7 +15,12 @@ from ..const import (
|
||||
CoreState,
|
||||
)
|
||||
from ..dbus.const import UnitActiveState
|
||||
from ..exceptions import BackupError, BackupInvalidError, BackupJobError
|
||||
from ..exceptions import (
|
||||
BackupError,
|
||||
BackupInvalidError,
|
||||
BackupJobError,
|
||||
BackupMountDownError,
|
||||
)
|
||||
from ..jobs.const import JOB_GROUP_BACKUP_MANAGER, JobCondition, JobExecutionLimit
|
||||
from ..jobs.decorator import Job
|
||||
from ..jobs.job_group import JobGroup
|
||||
@ -74,11 +79,15 @@ class BackupManager(FileConfiguration, JobGroup):
|
||||
|
||||
def _get_base_path(self, location: Mount | type[DEFAULT] | None = DEFAULT) -> Path:
|
||||
"""Get base path for backup using location or default location."""
|
||||
if location:
|
||||
return location.local_where
|
||||
|
||||
if location == DEFAULT and self.sys_mounts.default_backup_mount:
|
||||
return self.sys_mounts.default_backup_mount.local_where
|
||||
location = self.sys_mounts.default_backup_mount
|
||||
|
||||
if location:
|
||||
if not location.local_where.is_mount():
|
||||
raise BackupMountDownError(
|
||||
f"{location.name} is down, cannot back-up to it", _LOGGER.error
|
||||
)
|
||||
return location.local_where
|
||||
|
||||
return self.sys_config.path_backup
|
||||
|
||||
|
@ -607,6 +607,10 @@ class BackupInvalidError(BackupError):
|
||||
"""Raise if backup or password provided is invalid."""
|
||||
|
||||
|
||||
class BackupMountDownError(BackupError):
|
||||
"""Raise if mount specified for backup is down."""
|
||||
|
||||
|
||||
class BackupJobError(BackupError, JobException):
|
||||
"""Raise on Backup job error."""
|
||||
|
||||
|
@ -8,6 +8,7 @@ FILE_CONFIG_MOUNTS = PurePath("mounts.json")
|
||||
ATTR_DEFAULT_BACKUP_MOUNT = "default_backup_mount"
|
||||
ATTR_MOUNTS = "mounts"
|
||||
ATTR_PATH = "path"
|
||||
ATTR_READ_ONLY = "read_only"
|
||||
ATTR_SERVER = "server"
|
||||
ATTR_SHARE = "share"
|
||||
ATTR_USAGE = "usage"
|
||||
|
@ -297,6 +297,7 @@ class MountManager(FileConfiguration, CoreSysAttributes):
|
||||
name=f"{'emergency' if emergency else 'bind'}_{mount.name}",
|
||||
path=path,
|
||||
where=where,
|
||||
read_only=emergency,
|
||||
),
|
||||
emergency=emergency,
|
||||
)
|
||||
|
@ -39,6 +39,7 @@ from ..resolution.data import Issue
|
||||
from ..utils.sentry import capture_exception
|
||||
from .const import (
|
||||
ATTR_PATH,
|
||||
ATTR_READ_ONLY,
|
||||
ATTR_SERVER,
|
||||
ATTR_SHARE,
|
||||
ATTR_USAGE,
|
||||
@ -81,7 +82,9 @@ class Mount(CoreSysAttributes, ABC):
|
||||
|
||||
def to_dict(self, *, skip_secrets: bool = True) -> MountData:
|
||||
"""Return dictionary representation."""
|
||||
return MountData(name=self.name, type=self.type, usage=self.usage)
|
||||
return MountData(
|
||||
name=self.name, type=self.type, usage=self.usage, read_only=self.read_only
|
||||
)
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
@ -102,6 +105,11 @@ class Mount(CoreSysAttributes, ABC):
|
||||
else None
|
||||
)
|
||||
|
||||
@property
|
||||
def read_only(self) -> bool:
|
||||
"""Is mount read-only."""
|
||||
return self._data.get(ATTR_READ_ONLY, False)
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def what(self) -> str:
|
||||
@ -113,9 +121,9 @@ class Mount(CoreSysAttributes, ABC):
|
||||
"""Where to mount (on host)."""
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def options(self) -> list[str]:
|
||||
"""List of options to use to mount."""
|
||||
return ["ro"] if self.read_only else []
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
@ -358,7 +366,10 @@ class NetworkMount(Mount, ABC):
|
||||
@property
|
||||
def options(self) -> list[str]:
|
||||
"""Options to use to mount."""
|
||||
return [f"port={self.port}"] if self.port else []
|
||||
options = super().options
|
||||
if self.port:
|
||||
options.append(f"port={self.port}")
|
||||
return options
|
||||
|
||||
|
||||
class CIFSMount(NetworkMount):
|
||||
@ -484,6 +495,7 @@ class BindMount(Mount):
|
||||
path: Path,
|
||||
usage: MountUsage | None = None,
|
||||
where: PurePath | None = None,
|
||||
read_only: bool = False,
|
||||
) -> "BindMount":
|
||||
"""Create a new bind mount instance."""
|
||||
return BindMount(
|
||||
@ -493,6 +505,7 @@ class BindMount(Mount):
|
||||
type=MountType.BIND,
|
||||
path=path.as_posix(),
|
||||
usage=usage and usage,
|
||||
read_only=read_only,
|
||||
),
|
||||
where=where,
|
||||
)
|
||||
@ -523,4 +536,4 @@ class BindMount(Mount):
|
||||
@property
|
||||
def options(self) -> list[str]:
|
||||
"""List of options to use to mount."""
|
||||
return ["bind"]
|
||||
return super().options + ["bind"]
|
||||
|
@ -1,7 +1,7 @@
|
||||
"""Validation for mount manager."""
|
||||
|
||||
import re
|
||||
from typing import NotRequired, TypedDict
|
||||
from typing import Any, NotRequired, TypedDict
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
@ -18,6 +18,7 @@ from .const import (
|
||||
ATTR_DEFAULT_BACKUP_MOUNT,
|
||||
ATTR_MOUNTS,
|
||||
ATTR_PATH,
|
||||
ATTR_READ_ONLY,
|
||||
ATTR_SERVER,
|
||||
ATTR_SHARE,
|
||||
ATTR_USAGE,
|
||||
@ -26,6 +27,18 @@ from .const import (
|
||||
MountUsage,
|
||||
)
|
||||
|
||||
|
||||
def usage_specific_validation(config: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Validate config based on usage."""
|
||||
# Backup mounts cannot be read only
|
||||
if config[ATTR_USAGE] == MountUsage.BACKUP and config[ATTR_READ_ONLY]:
|
||||
raise vol.Invalid("Backup mounts cannot be read only")
|
||||
|
||||
return config
|
||||
|
||||
|
||||
# pylint: disable=no-value-for-parameter
|
||||
|
||||
RE_MOUNT_NAME = re.compile(r"^[A-Za-z0-9_]+$")
|
||||
RE_PATH_PART = re.compile(r"^[^\\\/]+")
|
||||
RE_MOUNT_OPTION = re.compile(r"^[^,=]+$")
|
||||
@ -41,6 +54,7 @@ _SCHEMA_BASE_MOUNT_CONFIG = vol.Schema(
|
||||
vol.In([MountType.CIFS.value, MountType.NFS.value]), vol.Coerce(MountType)
|
||||
),
|
||||
vol.Required(ATTR_USAGE): vol.Coerce(MountUsage),
|
||||
vol.Optional(ATTR_READ_ONLY, default=False): vol.Boolean(),
|
||||
},
|
||||
extra=vol.REMOVE_EXTRA,
|
||||
)
|
||||
@ -71,7 +85,9 @@ SCHEMA_MOUNT_NFS = _SCHEMA_MOUNT_NETWORK.extend(
|
||||
}
|
||||
)
|
||||
|
||||
SCHEMA_MOUNT_CONFIG = vol.Any(SCHEMA_MOUNT_CIFS, SCHEMA_MOUNT_NFS)
|
||||
SCHEMA_MOUNT_CONFIG = vol.All(
|
||||
vol.Any(SCHEMA_MOUNT_CIFS, SCHEMA_MOUNT_NFS), usage_specific_validation
|
||||
)
|
||||
|
||||
SCHEMA_MOUNTS_CONFIG = vol.Schema(
|
||||
{
|
||||
@ -86,6 +102,7 @@ class MountData(TypedDict):
|
||||
|
||||
name: str
|
||||
type: str
|
||||
read_only: bool
|
||||
usage: NotRequired[str]
|
||||
|
||||
# CIFS and NFS fields
|
||||
|
@ -90,13 +90,14 @@ async def test_backup_to_location(
|
||||
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
resp = await api_client.post(
|
||||
"/backups/new/full",
|
||||
json={
|
||||
"name": "Mount test",
|
||||
"location": location,
|
||||
},
|
||||
)
|
||||
with patch("supervisor.mounts.mount.Path.is_mount", return_value=True):
|
||||
resp = await api_client.post(
|
||||
"/backups/new/full",
|
||||
json={
|
||||
"name": "Mount test",
|
||||
"location": location,
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
slug = result["data"]["slug"]
|
||||
@ -134,10 +135,11 @@ async def test_backup_to_default(
|
||||
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
resp = await api_client.post(
|
||||
"/backups/new/full",
|
||||
json={"name": "Mount test"},
|
||||
)
|
||||
with patch("supervisor.mounts.mount.Path.is_mount", return_value=True):
|
||||
resp = await api_client.post(
|
||||
"/backups/new/full",
|
||||
json={"name": "Mount test"},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
slug = result["data"]["slug"]
|
||||
|
@ -79,6 +79,7 @@ async def test_api_create_mount(
|
||||
"server": "backup.local",
|
||||
"share": "backups",
|
||||
"state": "active",
|
||||
"read_only": False,
|
||||
}
|
||||
]
|
||||
coresys.mounts.save_data.assert_called_once()
|
||||
@ -253,6 +254,7 @@ async def test_api_update_mount(
|
||||
"server": "backup.local",
|
||||
"share": "new_backups",
|
||||
"state": "active",
|
||||
"read_only": False,
|
||||
}
|
||||
]
|
||||
coresys.mounts.save_data.assert_called_once()
|
||||
@ -320,6 +322,7 @@ async def test_api_update_dbus_error_mount_remains(
|
||||
"server": "backup.local",
|
||||
"share": "backups",
|
||||
"state": None,
|
||||
"read_only": False,
|
||||
}
|
||||
]
|
||||
|
||||
@ -366,6 +369,7 @@ async def test_api_update_dbus_error_mount_remains(
|
||||
"server": "backup.local",
|
||||
"share": "backups",
|
||||
"state": None,
|
||||
"read_only": False,
|
||||
}
|
||||
]
|
||||
|
||||
@ -776,3 +780,109 @@ async def test_api_create_mount_fails_special_chars(
|
||||
result = await resp.json()
|
||||
assert result["result"] == "error"
|
||||
assert "does not match regular expression" in result["message"]
|
||||
|
||||
|
||||
async def test_api_create_read_only_cifs_mount(
|
||||
api_client: TestClient,
|
||||
coresys: CoreSys,
|
||||
tmp_supervisor_data,
|
||||
path_extern,
|
||||
mount_propagation,
|
||||
):
|
||||
"""Test creating a read-only cifs mount via API."""
|
||||
resp = await api_client.post(
|
||||
"/mounts",
|
||||
json={
|
||||
"name": "media_test",
|
||||
"type": "cifs",
|
||||
"usage": "media",
|
||||
"server": "media.local",
|
||||
"share": "media",
|
||||
"version": "2.0",
|
||||
"read_only": True,
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
|
||||
resp = await api_client.get("/mounts")
|
||||
result = await resp.json()
|
||||
|
||||
assert result["data"]["mounts"] == [
|
||||
{
|
||||
"version": "2.0",
|
||||
"name": "media_test",
|
||||
"type": "cifs",
|
||||
"usage": "media",
|
||||
"server": "media.local",
|
||||
"share": "media",
|
||||
"state": "active",
|
||||
"read_only": True,
|
||||
}
|
||||
]
|
||||
coresys.mounts.save_data.assert_called_once()
|
||||
|
||||
|
||||
async def test_api_create_read_only_nfs_mount(
|
||||
api_client: TestClient,
|
||||
coresys: CoreSys,
|
||||
tmp_supervisor_data,
|
||||
path_extern,
|
||||
mount_propagation,
|
||||
):
|
||||
"""Test creating a read-only nfs mount via API."""
|
||||
resp = await api_client.post(
|
||||
"/mounts",
|
||||
json={
|
||||
"name": "media_test",
|
||||
"type": "nfs",
|
||||
"usage": "media",
|
||||
"server": "media.local",
|
||||
"path": "/media/camera",
|
||||
"read_only": True,
|
||||
},
|
||||
)
|
||||
result = await resp.json()
|
||||
assert result["result"] == "ok"
|
||||
|
||||
resp = await api_client.get("/mounts")
|
||||
result = await resp.json()
|
||||
|
||||
assert result["data"]["mounts"] == [
|
||||
{
|
||||
"name": "media_test",
|
||||
"type": "nfs",
|
||||
"usage": "media",
|
||||
"server": "media.local",
|
||||
"path": "/media/camera",
|
||||
"state": "active",
|
||||
"read_only": True,
|
||||
}
|
||||
]
|
||||
coresys.mounts.save_data.assert_called_once()
|
||||
|
||||
|
||||
async def test_api_read_only_backup_mount_invalid(
|
||||
api_client: TestClient,
|
||||
coresys: CoreSys,
|
||||
tmp_supervisor_data,
|
||||
path_extern,
|
||||
mount_propagation,
|
||||
):
|
||||
"""Test cannot create a read-only backup mount."""
|
||||
resp = await api_client.post(
|
||||
"/mounts",
|
||||
json={
|
||||
"name": "backup_test",
|
||||
"type": "cifs",
|
||||
"usage": "backup",
|
||||
"server": "backup.local",
|
||||
"share": "backup",
|
||||
"version": "2.0",
|
||||
"read_only": True,
|
||||
},
|
||||
)
|
||||
assert resp.status == 400
|
||||
result = await resp.json()
|
||||
assert result["result"] == "error"
|
||||
assert "Backup mounts cannot be read only" in result["message"]
|
||||
|
@ -27,6 +27,7 @@ from supervisor.exceptions import (
|
||||
BackupError,
|
||||
BackupInvalidError,
|
||||
BackupJobError,
|
||||
BackupMountDownError,
|
||||
DockerError,
|
||||
)
|
||||
from supervisor.homeassistant.api import HomeAssistantAPI
|
||||
@ -612,7 +613,8 @@ async def test_full_backup_to_mount(
|
||||
# Make a backup and add it to mounts. Confirm it exists in the right place
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
backup: Backup = await coresys.backups.do_backup_full("test", location=mount)
|
||||
with patch("supervisor.mounts.mount.Path.is_mount", return_value=True):
|
||||
backup: Backup = await coresys.backups.do_backup_full("test", location=mount)
|
||||
assert (mount_dir / f"{backup.slug}.tar").exists()
|
||||
|
||||
# Reload and check that backups in mounts are listed
|
||||
@ -661,7 +663,7 @@ async def test_partial_backup_to_mount(
|
||||
HomeAssistant,
|
||||
"version",
|
||||
new=PropertyMock(return_value=AwesomeVersion("2023.1.1")),
|
||||
):
|
||||
), patch("supervisor.mounts.mount.Path.is_mount", return_value=True):
|
||||
backup: Backup = await coresys.backups.do_backup_partial(
|
||||
"test", homeassistant=True, location=mount
|
||||
)
|
||||
@ -681,6 +683,37 @@ async def test_partial_backup_to_mount(
|
||||
assert marker.exists()
|
||||
|
||||
|
||||
async def test_backup_to_down_mount_error(
|
||||
coresys: CoreSys, tmp_supervisor_data, path_extern, mount_propagation
|
||||
):
|
||||
"""Test backup to mount when down raises error."""
|
||||
# Add a backup mount
|
||||
(mount_dir := coresys.config.path_mounts / "backup_test").mkdir()
|
||||
await coresys.mounts.load()
|
||||
mount = Mount.from_dict(
|
||||
coresys,
|
||||
{
|
||||
"name": "backup_test",
|
||||
"usage": "backup",
|
||||
"type": "cifs",
|
||||
"server": "test.local",
|
||||
"share": "test",
|
||||
},
|
||||
)
|
||||
await coresys.mounts.create_mount(mount)
|
||||
assert mount_dir in coresys.backups.backup_locations
|
||||
|
||||
# Attempt to make a backup which fails because is_mount on directory is false
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
with pytest.raises(BackupMountDownError):
|
||||
await coresys.backups.do_backup_full("test", location=mount)
|
||||
with pytest.raises(BackupMountDownError):
|
||||
await coresys.backups.do_backup_partial(
|
||||
"test", location=mount, homeassistant=True
|
||||
)
|
||||
|
||||
|
||||
async def test_backup_to_local_with_default(
|
||||
coresys: CoreSys,
|
||||
tmp_supervisor_data,
|
||||
@ -747,7 +780,7 @@ async def test_backup_to_default(
|
||||
HomeAssistant,
|
||||
"version",
|
||||
new=PropertyMock(return_value=AwesomeVersion("2023.1.1")),
|
||||
):
|
||||
), patch("supervisor.mounts.mount.Path.is_mount", return_value=True):
|
||||
backup: Backup = await coresys.backups.do_backup_partial(
|
||||
"test", homeassistant=True
|
||||
)
|
||||
@ -755,6 +788,34 @@ async def test_backup_to_default(
|
||||
assert (mount_dir / f"{backup.slug}.tar").exists()
|
||||
|
||||
|
||||
async def test_backup_to_default_mount_down_error(
|
||||
coresys: CoreSys, tmp_supervisor_data, path_extern, mount_propagation
|
||||
):
|
||||
"""Test making backup to default mount when it is down."""
|
||||
# Add a default backup mount
|
||||
(coresys.config.path_mounts / "backup_test").mkdir()
|
||||
await coresys.mounts.load()
|
||||
mount = Mount.from_dict(
|
||||
coresys,
|
||||
{
|
||||
"name": "backup_test",
|
||||
"usage": "backup",
|
||||
"type": "cifs",
|
||||
"server": "test.local",
|
||||
"share": "test",
|
||||
},
|
||||
)
|
||||
await coresys.mounts.create_mount(mount)
|
||||
coresys.mounts.default_backup_mount = mount
|
||||
|
||||
# Attempt to make a backup which fails because is_mount on directory is false
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
|
||||
with pytest.raises(BackupMountDownError):
|
||||
await coresys.backups.do_backup_partial("test", homeassistant=True)
|
||||
|
||||
|
||||
async def test_load_network_error(
|
||||
coresys: CoreSys,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
@ -1529,8 +1590,9 @@ async def test_backup_to_mount_bypasses_free_space_condition(
|
||||
mount = coresys.mounts.get("backup_test")
|
||||
|
||||
# These succeed because local free space does not matter when using a mount
|
||||
await coresys.backups.do_backup_full(location=mount)
|
||||
await coresys.backups.do_backup_partial(folders=["media"], location=mount)
|
||||
with patch("supervisor.mounts.mount.Path.is_mount", return_value=True):
|
||||
await coresys.backups.do_backup_full(location=mount)
|
||||
await coresys.backups.do_backup_partial(folders=["media"], location=mount)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -311,7 +311,7 @@ async def test_mount_failed_during_load(
|
||||
"mnt-data-supervisor-media-media_test.mount",
|
||||
"fail",
|
||||
[
|
||||
["Options", Variant("s", "bind")],
|
||||
["Options", Variant("s", "ro,bind")],
|
||||
[
|
||||
"Description",
|
||||
Variant("s", "Supervisor bind mount: emergency_media_test"),
|
||||
@ -498,6 +498,7 @@ async def test_save_data(
|
||||
"share": "backups",
|
||||
"username": "admin",
|
||||
"password": "password",
|
||||
"read_only": False,
|
||||
}
|
||||
]
|
||||
|
||||
|
@ -64,6 +64,7 @@ async def test_cifs_mount(
|
||||
"version": None,
|
||||
"username": "admin",
|
||||
"password": "password",
|
||||
"read_only": False,
|
||||
**additional_data,
|
||||
}
|
||||
mount: CIFSMount = Mount.from_dict(coresys, mount_data)
|
||||
@ -75,6 +76,7 @@ async def test_cifs_mount(
|
||||
assert mount.port is None
|
||||
assert mount.state is None
|
||||
assert mount.unit is None
|
||||
assert mount.read_only is False
|
||||
|
||||
assert mount.what == "//test.local/camera"
|
||||
assert mount.where == Path("/mnt/data/supervisor/mounts/test")
|
||||
@ -136,6 +138,51 @@ async def test_cifs_mount(
|
||||
assert not mount.path_credentials.exists()
|
||||
|
||||
|
||||
async def test_cifs_mount_read_only(
|
||||
coresys: CoreSys,
|
||||
all_dbus_services: dict[str, DBusServiceMock],
|
||||
tmp_supervisor_data: Path,
|
||||
path_extern,
|
||||
):
|
||||
"""Test a read-only cifs mount."""
|
||||
systemd_service: SystemdService = all_dbus_services["systemd"]
|
||||
systemd_service.StartTransientUnit.calls.clear()
|
||||
|
||||
mount_data = {
|
||||
"name": "test",
|
||||
"usage": "media",
|
||||
"type": "cifs",
|
||||
"server": "test.local",
|
||||
"share": "camera",
|
||||
"version": None,
|
||||
"read_only": True,
|
||||
}
|
||||
mount: CIFSMount = Mount.from_dict(coresys, mount_data)
|
||||
|
||||
assert isinstance(mount, CIFSMount)
|
||||
assert mount.read_only is True
|
||||
|
||||
await mount.mount()
|
||||
|
||||
assert mount.state == UnitActiveState.ACTIVE
|
||||
assert mount.local_where.exists()
|
||||
assert mount.local_where.is_dir()
|
||||
|
||||
assert systemd_service.StartTransientUnit.calls == [
|
||||
(
|
||||
"mnt-data-supervisor-mounts-test.mount",
|
||||
"fail",
|
||||
[
|
||||
["Options", Variant("s", "ro,noserverino,guest")],
|
||||
["Type", Variant("s", "cifs")],
|
||||
["Description", Variant("s", "Supervisor cifs mount: test")],
|
||||
["What", Variant("s", "//test.local/camera")],
|
||||
],
|
||||
[],
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
async def test_nfs_mount(
|
||||
coresys: CoreSys,
|
||||
all_dbus_services: dict[str, DBusServiceMock],
|
||||
@ -153,6 +200,7 @@ async def test_nfs_mount(
|
||||
"server": "test.local",
|
||||
"path": "/media/camera",
|
||||
"port": 1234,
|
||||
"read_only": False,
|
||||
}
|
||||
mount: NFSMount = Mount.from_dict(coresys, mount_data)
|
||||
|
||||
@ -163,6 +211,7 @@ async def test_nfs_mount(
|
||||
assert mount.port == 1234
|
||||
assert mount.state is None
|
||||
assert mount.unit is None
|
||||
assert mount.read_only is False
|
||||
|
||||
assert mount.what == "test.local:/media/camera"
|
||||
assert mount.where == Path("/mnt/data/supervisor/mounts/test")
|
||||
@ -193,6 +242,51 @@ async def test_nfs_mount(
|
||||
]
|
||||
|
||||
|
||||
async def test_nfs_mount_read_only(
|
||||
coresys: CoreSys,
|
||||
all_dbus_services: dict[str, DBusServiceMock],
|
||||
tmp_supervisor_data: Path,
|
||||
path_extern,
|
||||
):
|
||||
"""Test NFS mount."""
|
||||
systemd_service: SystemdService = all_dbus_services["systemd"]
|
||||
systemd_service.StartTransientUnit.calls.clear()
|
||||
|
||||
mount_data = {
|
||||
"name": "test",
|
||||
"usage": "media",
|
||||
"type": "nfs",
|
||||
"server": "test.local",
|
||||
"path": "/media/camera",
|
||||
"port": 1234,
|
||||
"read_only": True,
|
||||
}
|
||||
mount: NFSMount = Mount.from_dict(coresys, mount_data)
|
||||
|
||||
assert isinstance(mount, NFSMount)
|
||||
assert mount.read_only is True
|
||||
|
||||
await mount.mount()
|
||||
|
||||
assert mount.state == UnitActiveState.ACTIVE
|
||||
assert mount.local_where.exists()
|
||||
assert mount.local_where.is_dir()
|
||||
|
||||
assert systemd_service.StartTransientUnit.calls == [
|
||||
(
|
||||
"mnt-data-supervisor-mounts-test.mount",
|
||||
"fail",
|
||||
[
|
||||
["Options", Variant("s", "ro,port=1234,soft,timeo=200")],
|
||||
["Type", Variant("s", "nfs")],
|
||||
["Description", Variant("s", "Supervisor nfs mount: test")],
|
||||
["What", Variant("s", "test.local:/media/camera")],
|
||||
],
|
||||
[],
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
async def test_load(
|
||||
coresys: CoreSys,
|
||||
all_dbus_services: dict[str, DBusServiceMock],
|
||||
|
Loading…
x
Reference in New Issue
Block a user