Allow all characters in mount credentials (#4399)

* Allow all characters in mount credentials

* Fix permissions on credential files

* Fix pylint issue
This commit is contained in:
Mike Degatano 2023-06-22 15:55:13 -04:00 committed by GitHub
parent d3031e2eae
commit 9a7d547394
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 66 additions and 11 deletions

View File

@ -202,11 +202,18 @@ def initialize_system(coresys: CoreSys) -> None:
_LOGGER.debug("Creating Supervisor media folder at '%s'", config.path_media) _LOGGER.debug("Creating Supervisor media folder at '%s'", config.path_media)
config.path_media.mkdir() config.path_media.mkdir()
# Mounts folder # Mounts folders
if not config.path_mounts.is_dir(): if not config.path_mounts.is_dir():
_LOGGER.debug("Creating Supervisor mounts folder at '%s'", config.path_mounts) _LOGGER.debug("Creating Supervisor mounts folder at '%s'", config.path_mounts)
config.path_mounts.mkdir() config.path_mounts.mkdir()
if not config.path_mounts_credentials.is_dir():
_LOGGER.debug(
"Creating Supervisor mounts credentials folder at '%s'",
config.path_mounts_credentials,
)
config.path_mounts_credentials.mkdir(mode=0o600)
# Emergency folder # Emergency folder
if not config.path_emergency.is_dir(): if not config.path_emergency.is_dir():
_LOGGER.debug( _LOGGER.debug(

View File

@ -46,6 +46,7 @@ DNS_DATA = PurePath("dns")
AUDIO_DATA = PurePath("audio") AUDIO_DATA = PurePath("audio")
MEDIA_DATA = PurePath("media") MEDIA_DATA = PurePath("media")
MOUNTS_FOLDER = PurePath("mounts") MOUNTS_FOLDER = PurePath("mounts")
MOUNTS_CREDENTIALS = PurePath(".mounts_credentials")
EMERGENCY_DATA = PurePath("emergency") EMERGENCY_DATA = PurePath("emergency")
DEFAULT_BOOT_TIME = datetime.utcfromtimestamp(0).isoformat() DEFAULT_BOOT_TIME = datetime.utcfromtimestamp(0).isoformat()
@ -315,6 +316,16 @@ class CoreConfig(FileConfiguration):
"""Return mounts path external for Docker.""" """Return mounts path external for Docker."""
return self.path_extern_supervisor / MOUNTS_FOLDER return self.path_extern_supervisor / MOUNTS_FOLDER
@property
def path_mounts_credentials(self) -> Path:
"""Return mounts credentials folder."""
return self.path_supervisor / MOUNTS_CREDENTIALS
@property
def path_extern_mounts_credentials(self) -> PurePath:
"""Return mounts credentials path external for Docker."""
return self.path_extern_supervisor / MOUNTS_CREDENTIALS
@property @property
def path_emergency(self) -> Path: def path_emergency(self) -> Path:
"""Return emergency data folder.""" """Return emergency data folder."""

View File

@ -388,12 +388,38 @@ class CIFSMount(NetworkMount):
options.append(f"vers={self.version}") options.append(f"vers={self.version}")
if self.username and self.password: if self.username and self.password:
options.extend([f"username={self.username}", f"password={self.password}"]) options.append(f"credentials={self.path_extern_credentials.as_posix()}")
else: else:
options.append("guest") options.append("guest")
return options return options
@property
def path_credentials(self) -> Path:
"""Path to credentials file."""
return self.sys_config.path_mounts_credentials / self.name
@property
def path_extern_credentials(self) -> PurePath:
"""Path to credentials file external to Docker."""
return self.sys_config.path_extern_mounts_credentials / self.name
async def mount(self) -> None:
"""Mount using systemd."""
if self.username and self.password:
if not self.path_credentials.exists():
self.path_credentials.touch(mode=0o600)
with self.path_credentials.open(mode="w") as cred_file:
cred_file.write(f"username={self.username}\npassword={self.password}")
await super().mount()
async def unmount(self) -> None:
"""Unmount using systemd."""
self.path_credentials.unlink(missing_ok=True)
await super().unmount()
class NFSMount(NetworkMount): class NFSMount(NetworkMount):
"""An NFS type mount.""" """An NFS type mount."""

View File

@ -34,8 +34,6 @@ RE_MOUNT_OPTION = re.compile(r"^[^,=]+$")
VALIDATE_NAME = vol.Match(RE_MOUNT_NAME) VALIDATE_NAME = vol.Match(RE_MOUNT_NAME)
VALIDATE_SERVER = vol.Match(RE_PATH_PART) VALIDATE_SERVER = vol.Match(RE_PATH_PART)
VALIDATE_SHARE = vol.Match(RE_PATH_PART) VALIDATE_SHARE = vol.Match(RE_PATH_PART)
VALIDATE_USERNAME = vol.Match(RE_MOUNT_OPTION)
VALIDATE_PASSWORD = vol.Match(RE_MOUNT_OPTION)
_SCHEMA_BASE_MOUNT_CONFIG = vol.Schema( _SCHEMA_BASE_MOUNT_CONFIG = vol.Schema(
{ {
@ -57,8 +55,8 @@ SCHEMA_MOUNT_CIFS = _SCHEMA_MOUNT_NETWORK.extend(
{ {
vol.Required(ATTR_TYPE): MountType.CIFS.value, vol.Required(ATTR_TYPE): MountType.CIFS.value,
vol.Required(ATTR_SHARE): VALIDATE_SHARE, vol.Required(ATTR_SHARE): VALIDATE_SHARE,
vol.Inclusive(ATTR_USERNAME, "basic_auth"): VALIDATE_USERNAME, vol.Inclusive(ATTR_USERNAME, "basic_auth"): str,
vol.Inclusive(ATTR_PASSWORD, "basic_auth"): VALIDATE_PASSWORD, vol.Inclusive(ATTR_PASSWORD, "basic_auth"): str,
vol.Optional(ATTR_VERSION, default=None): vol.Maybe( vol.Optional(ATTR_VERSION, default=None): vol.Maybe(
vol.Coerce(MountCifsVersion) vol.Coerce(MountCifsVersion)
), ),

View File

@ -371,6 +371,7 @@ async def tmp_supervisor_data(coresys: CoreSys, tmp_path: Path) -> Path:
coresys.config.path_emergency.mkdir() coresys.config.path_emergency.mkdir()
coresys.config.path_media.mkdir() coresys.config.path_media.mkdir()
coresys.config.path_mounts.mkdir() coresys.config.path_mounts.mkdir()
coresys.config.path_mounts_credentials.mkdir()
coresys.config.path_backup.mkdir() coresys.config.path_backup.mkdir()
coresys.config.path_tmp.mkdir() coresys.config.path_tmp.mkdir()
coresys.config.path_homeassistant.mkdir() coresys.config.path_homeassistant.mkdir()

View File

@ -3,6 +3,7 @@ from __future__ import annotations
import os import os
from pathlib import Path from pathlib import Path
import stat
from typing import Any from typing import Any
from unittest.mock import patch from unittest.mock import patch
@ -78,8 +79,7 @@ async def test_cifs_mount(
assert mount.where == Path("/mnt/data/supervisor/mounts/test") assert mount.where == Path("/mnt/data/supervisor/mounts/test")
assert mount.local_where == tmp_supervisor_data / "mounts" / "test" assert mount.local_where == tmp_supervisor_data / "mounts" / "test"
assert mount.options == ["noserverino"] + expected_options + [ assert mount.options == ["noserverino"] + expected_options + [
f"username={mount_data['username']}", "credentials=/mnt/data/supervisor/.mounts_credentials/test",
f"password={mount_data['password']}",
] ]
assert not mount.local_where.exists() assert not mount.local_where.exists()
@ -107,8 +107,7 @@ async def test_cifs_mount(
["noserverino"] ["noserverino"]
+ expected_options + expected_options
+ [ + [
f"username={mount_data['username']}", "credentials=/mnt/data/supervisor/.mounts_credentials/test"
f"password={mount_data['password']}",
] ]
), ),
), ),
@ -120,6 +119,19 @@ async def test_cifs_mount(
[], [],
) )
] ]
assert mount.path_credentials.exists()
with mount.path_credentials.open("r") as creds:
assert creds.read().split("\n") == [
f"username={mount_data['username']}",
f"password={mount_data['password']}",
]
cred_stat = mount.path_credentials.stat()
assert not cred_stat.st_mode & stat.S_IRGRP
assert not cred_stat.st_mode & stat.S_IROTH
await mount.unmount()
assert not mount.path_credentials.exists()
async def test_nfs_mount( async def test_nfs_mount(
@ -279,7 +291,7 @@ async def test_unmount(
systemd_service: SystemdService = all_dbus_services["systemd"] systemd_service: SystemdService = all_dbus_services["systemd"]
systemd_service.StopUnit.calls.clear() systemd_service.StopUnit.calls.clear()
mount = Mount.from_dict( mount: CIFSMount = Mount.from_dict(
coresys, coresys,
{ {
"name": "test", "name": "test",