Add backup agent to Synology DSM (#135227)

* pre-alpha state

* small type

* use ChunkAsyncStreamIterator from aiohttp_client helper

* create parent folders during upload if none exists

* check file station permissionsduring setup

* ensure backup-agents are reloaded

* adjust config flow

* fix check for availability of file station

* fix possible unbound

* add config flow tests

* fix existing tests

* add backup tests

* backup listeners are not async

* some more tests

* migrate existing config entries

* fix migration

* notify backup listeners only when needed during setup

* add backup settings to options flow

* switch back to the listener approach from the dev docs example

* add negative tests

* fix tests

* use HassKey

* fix tests

* Revert "use HassKey"

This reverts commit 71c5a4d6fa9c04b4907ff5f8df6ef7bd1737aa85.

* use hass loop call_soon instead of non-eager-start tasks

* use HassKey for backup-agent-listeners

* delete empty backup-agent-listener list from hass.data

* don't handle single file download errors

* Apply suggestions from code review

Co-authored-by: J. Nick Koston <nick@koston.org>

* add more tests

* we don't have entities related to file_station api

* add more backup tests

* test unload backup agent

* revert sorting of properties

* additional use hass config location for default backup path

---------

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Michael 2025-01-25 22:31:30 +01:00 committed by GitHub
parent 5e6f624938
commit cf8409dcd2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1297 additions and 38 deletions

View File

@ -11,12 +11,15 @@ from synology_dsm.exceptions import SynologyDSMNotLoggedInException
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_MAC, CONF_VERIFY_SSL
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import device_registry as dr
from .common import SynoApi, raise_config_entry_auth_error
from .const import (
CONF_BACKUP_PATH,
CONF_BACKUP_SHARE,
DATA_BACKUP_AGENT_LISTENERS,
DEFAULT_VERIFY_SSL,
DOMAIN,
EXCEPTION_DETAILS,
@ -60,6 +63,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass.config_entries.async_update_entry(
entry, data={**entry.data, CONF_VERIFY_SSL: DEFAULT_VERIFY_SSL}
)
if CONF_BACKUP_SHARE not in entry.options:
hass.config_entries.async_update_entry(
entry,
options={**entry.options, CONF_BACKUP_SHARE: None, CONF_BACKUP_PATH: None},
)
# Continue setup
api = SynoApi(hass, entry)
@ -118,6 +126,9 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(_async_update_listener))
if entry.options[CONF_BACKUP_SHARE]:
_async_notify_backup_listeners_soon(hass)
return True
@ -127,9 +138,20 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
entry_data: SynologyDSMData = hass.data[DOMAIN][entry.unique_id]
await entry_data.api.async_unload()
hass.data[DOMAIN].pop(entry.unique_id)
_async_notify_backup_listeners_soon(hass)
return unload_ok
def _async_notify_backup_listeners(hass: HomeAssistant) -> None:
for listener in hass.data.get(DATA_BACKUP_AGENT_LISTENERS, []):
listener()
@callback
def _async_notify_backup_listeners_soon(hass: HomeAssistant) -> None:
hass.loop.call_soon(_async_notify_backup_listeners, hass)
async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
await hass.config_entries.async_reload(entry.entry_id)

View File

@ -0,0 +1,223 @@
"""Support for Synology DSM backup agents."""
from __future__ import annotations
from collections.abc import AsyncIterator, Callable, Coroutine
import logging
from typing import TYPE_CHECKING, Any
from aiohttp import StreamReader
from synology_dsm.api.file_station import SynoFileStation
from synology_dsm.exceptions import SynologyDSMAPIErrorException
from homeassistant.components.backup import AgentBackup, BackupAgent, BackupAgentError
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.aiohttp_client import ChunkAsyncStreamIterator
from homeassistant.helpers.json import json_dumps
from homeassistant.util.json import JsonObjectType, json_loads_object
from .const import (
CONF_BACKUP_PATH,
CONF_BACKUP_SHARE,
DATA_BACKUP_AGENT_LISTENERS,
DOMAIN,
)
from .models import SynologyDSMData
LOGGER = logging.getLogger(__name__)
async def async_get_backup_agents(
hass: HomeAssistant,
) -> list[BackupAgent]:
"""Return a list of backup agents."""
if not (
entries := hass.config_entries.async_loaded_entries(DOMAIN)
) or not hass.data.get(DOMAIN):
LOGGER.debug("No proper config entry found")
return []
syno_datas: dict[str, SynologyDSMData] = hass.data[DOMAIN]
return [
SynologyDSMBackupAgent(hass, entry)
for entry in entries
if entry.unique_id is not None
and (syno_data := syno_datas.get(entry.unique_id))
and syno_data.api.file_station
and entry.options.get(CONF_BACKUP_PATH)
]
@callback
def async_register_backup_agents_listener(
hass: HomeAssistant,
*,
listener: Callable[[], None],
**kwargs: Any,
) -> Callable[[], None]:
"""Register a listener to be called when agents are added or removed.
:return: A function to unregister the listener.
"""
hass.data.setdefault(DATA_BACKUP_AGENT_LISTENERS, []).append(listener)
@callback
def remove_listener() -> None:
"""Remove the listener."""
hass.data[DATA_BACKUP_AGENT_LISTENERS].remove(listener)
if not hass.data[DATA_BACKUP_AGENT_LISTENERS]:
del hass.data[DATA_BACKUP_AGENT_LISTENERS]
return remove_listener
class SynologyDSMBackupAgent(BackupAgent):
"""Synology DSM backup agent."""
domain = DOMAIN
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Initialize the Synology DSM backup agent."""
super().__init__()
LOGGER.debug("Initializing Synology DSM backup agent for %s", entry.unique_id)
self.name = entry.title
self.path = (
f"{entry.options[CONF_BACKUP_SHARE]}/{entry.options[CONF_BACKUP_PATH]}"
)
syno_data: SynologyDSMData = hass.data[DOMAIN][entry.unique_id]
self.api = syno_data.api
@property
def _file_station(self) -> SynoFileStation:
if TYPE_CHECKING:
# we ensure that file_station exist already in async_get_backup_agents
assert self.api.file_station
return self.api.file_station
async def async_download_backup(
self,
backup_id: str,
**kwargs: Any,
) -> AsyncIterator[bytes]:
"""Download a backup file.
:param backup_id: The ID of the backup that was returned in async_list_backups.
:return: An async iterator that yields bytes.
"""
try:
resp = await self._file_station.download_file(
path=self.path,
filename=f"{backup_id}.tar",
)
except SynologyDSMAPIErrorException as err:
raise BackupAgentError("Failed to download backup") from err
if TYPE_CHECKING:
assert isinstance(resp, StreamReader)
return ChunkAsyncStreamIterator(resp)
async def async_upload_backup(
self,
*,
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
backup: AgentBackup,
**kwargs: Any,
) -> None:
"""Upload a backup.
:param open_stream: A function returning an async iterator that yields bytes.
:param backup: Metadata about the backup that should be uploaded.
"""
# upload backup.tar file first
try:
await self._file_station.upload_file(
path=self.path,
filename=f"{backup.backup_id}.tar",
source=await open_stream(),
create_parents=True,
)
except SynologyDSMAPIErrorException as err:
raise BackupAgentError("Failed to upload backup") from err
# upload backup_meta.json file when backup.tar was successful uploaded
try:
await self._file_station.upload_file(
path=self.path,
filename=f"{backup.backup_id}_meta.json",
source=json_dumps(backup.as_dict()).encode(),
)
except SynologyDSMAPIErrorException as err:
raise BackupAgentError("Failed to upload backup") from err
async def async_delete_backup(
self,
backup_id: str,
**kwargs: Any,
) -> None:
"""Delete a backup file.
:param backup_id: The ID of the backup that was returned in async_list_backups.
"""
try:
await self._file_station.delete_file(
path=self.path, filename=f"{backup_id}.tar"
)
await self._file_station.delete_file(
path=self.path, filename=f"{backup_id}_meta.json"
)
except SynologyDSMAPIErrorException as err:
raise BackupAgentError("Failed to delete the backup") from err
async def async_list_backups(self, **kwargs: Any) -> list[AgentBackup]:
"""List backups."""
return list((await self._async_list_backups(**kwargs)).values())
async def _async_list_backups(self, **kwargs: Any) -> dict[str, AgentBackup]:
"""List backups."""
async def _download_meta_data(filename: str) -> JsonObjectType:
try:
resp = await self._file_station.download_file(
path=self.path, filename=filename
)
except SynologyDSMAPIErrorException as err:
raise BackupAgentError("Failed to download meta data") from err
if TYPE_CHECKING:
assert isinstance(resp, StreamReader)
try:
return json_loads_object(await resp.read())
except Exception as err:
raise BackupAgentError("Failed to read meta data") from err
try:
files = await self._file_station.get_files(path=self.path)
except SynologyDSMAPIErrorException as err:
raise BackupAgentError("Failed to list backups") from err
if TYPE_CHECKING:
assert files
backups: dict[str, AgentBackup] = {}
for file in files:
if file.name.endswith("_meta.json"):
try:
meta_data = await _download_meta_data(file.name)
except BackupAgentError as err:
LOGGER.error("Failed to download meta data: %s", err)
continue
agent_backup = AgentBackup.from_dict(meta_data)
backups[agent_backup.backup_id] = agent_backup
return backups
async def async_get_backup(
self,
backup_id: str,
**kwargs: Any,
) -> AgentBackup | None:
"""Return a backup."""
backups = await self._async_list_backups()
return backups.get(backup_id)

View File

@ -14,6 +14,7 @@ from synology_dsm.api.core.upgrade import SynoCoreUpgrade
from synology_dsm.api.core.utilization import SynoCoreUtilization
from synology_dsm.api.dsm.information import SynoDSMInformation
from synology_dsm.api.dsm.network import SynoDSMNetwork
from synology_dsm.api.file_station import SynoFileStation
from synology_dsm.api.photos import SynoPhotos
from synology_dsm.api.storage.storage import SynoStorage
from synology_dsm.api.surveillance_station import SynoSurveillanceStation
@ -62,11 +63,12 @@ class SynoApi:
self.config_url = f"http://{entry.data[CONF_HOST]}:{entry.data[CONF_PORT]}"
# DSM APIs
self.file_station: SynoFileStation | None = None
self.information: SynoDSMInformation | None = None
self.network: SynoDSMNetwork | None = None
self.photos: SynoPhotos | None = None
self.security: SynoCoreSecurity | None = None
self.storage: SynoStorage | None = None
self.photos: SynoPhotos | None = None
self.surveillance_station: SynoSurveillanceStation | None = None
self.system: SynoCoreSystem | None = None
self.upgrade: SynoCoreUpgrade | None = None
@ -74,10 +76,11 @@ class SynoApi:
# Should we fetch them
self._fetching_entities: dict[str, set[str]] = {}
self._with_file_station = True
self._with_information = True
self._with_photos = True
self._with_security = True
self._with_storage = True
self._with_photos = True
self._with_surveillance_station = True
self._with_system = True
self._with_upgrade = True
@ -157,6 +160,26 @@ class SynoApi:
self.dsm.reset(SynoCoreUpgrade.API_KEY)
LOGGER.debug("Disabled fetching upgrade data during setup: %s", ex)
# check if file station is used and permitted
self._with_file_station = bool(self.dsm.apis.get(SynoFileStation.LIST_API_KEY))
if self._with_file_station:
shares: list | None = None
with suppress(*SYNOLOGY_CONNECTION_EXCEPTIONS):
shares = await self.dsm.file.get_shared_folders(only_writable=True)
if not shares:
self._with_file_station = False
self.dsm.reset(SynoFileStation.API_KEY)
LOGGER.debug(
"File Station found, but disabled due to missing user"
" permissions or no writable shared folders available"
)
LOGGER.debug(
"State of File Station during setup of '%s': %s",
self._entry.unique_id,
self._with_file_station,
)
await self._fetch_device_configuration()
try:
@ -225,6 +248,15 @@ class SynoApi:
self.dsm.reset(self.security)
self.security = None
if not self._with_file_station:
LOGGER.debug(
"Disable file station api from being updated or '%s'",
self._entry.unique_id,
)
if self.file_station:
self.dsm.reset(self.file_station)
self.file_station = None
if not self._with_photos:
LOGGER.debug(
"Disable photos api from being updated or '%s'", self._entry.unique_id
@ -272,6 +304,12 @@ class SynoApi:
self.network = self.dsm.network
await self.network.update()
if self._with_file_station:
LOGGER.debug(
"Enable file station api updates for '%s'", self._entry.unique_id
)
self.file_station = self.dsm.file
if self._with_security:
LOGGER.debug("Enable security api updates for '%s'", self._entry.unique_id)
self.security = self.dsm.security

View File

@ -3,12 +3,14 @@
from __future__ import annotations
from collections.abc import Mapping
from contextlib import suppress
from ipaddress import ip_address as ip
import logging
from typing import Any, cast
from typing import TYPE_CHECKING, Any, cast
from urllib.parse import urlparse
from synology_dsm import SynologyDSM
from synology_dsm.api.file_station.models import SynoFileSharedFolder
from synology_dsm.exceptions import (
SynologyDSMException,
SynologyDSMLogin2SAFailedException,
@ -40,6 +42,12 @@ from homeassistant.core import callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.selector import (
SelectOptionDict,
SelectSelector,
SelectSelectorConfig,
SelectSelectorMode,
)
from homeassistant.helpers.service_info.ssdp import (
ATTR_UPNP_FRIENDLY_NAME,
ATTR_UPNP_SERIAL,
@ -47,12 +55,16 @@ from homeassistant.helpers.service_info.ssdp import (
)
from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
from homeassistant.helpers.typing import DiscoveryInfoType, VolDictType
from homeassistant.util import slugify
from homeassistant.util.network import is_ip_address as is_ip
from .const import (
CONF_BACKUP_PATH,
CONF_BACKUP_SHARE,
CONF_DEVICE_TOKEN,
CONF_SNAPSHOT_QUALITY,
CONF_VOLUMES,
DEFAULT_BACKUP_PATH,
DEFAULT_PORT,
DEFAULT_PORT_SSL,
DEFAULT_SCAN_INTERVAL,
@ -61,7 +73,9 @@ from .const import (
DEFAULT_USE_SSL,
DEFAULT_VERIFY_SSL,
DOMAIN,
SYNOLOGY_CONNECTION_EXCEPTIONS,
)
from .models import SynologyDSMData
_LOGGER = logging.getLogger(__name__)
@ -131,6 +145,7 @@ class SynologyDSMFlowHandler(ConfigFlow, domain=DOMAIN):
self.discovered_conf: dict[str, Any] = {}
self.reauth_conf: Mapping[str, Any] = {}
self.reauth_reason: str | None = None
self.shares: list[SynoFileSharedFolder] | None = None
def _show_form(
self,
@ -173,6 +188,8 @@ class SynologyDSMFlowHandler(ConfigFlow, domain=DOMAIN):
verify_ssl = user_input.get(CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL)
otp_code = user_input.get(CONF_OTP_CODE)
friendly_name = user_input.get(CONF_NAME)
backup_path = user_input.get(CONF_BACKUP_PATH)
backup_share = user_input.get(CONF_BACKUP_SHARE)
if not port:
if use_ssl is True:
@ -209,6 +226,12 @@ class SynologyDSMFlowHandler(ConfigFlow, domain=DOMAIN):
if errors:
return self._show_form(step_id, user_input, errors)
with suppress(*SYNOLOGY_CONNECTION_EXCEPTIONS):
self.shares = await api.file.get_shared_folders(only_writable=True)
if self.shares and not backup_path:
return await self.async_step_backup_share(user_input)
# unique_id should be serial for services purpose
existing_entry = await self.async_set_unique_id(serial, raise_on_progress=False)
@ -221,6 +244,10 @@ class SynologyDSMFlowHandler(ConfigFlow, domain=DOMAIN):
CONF_PASSWORD: password,
CONF_MAC: api.network.macs,
}
config_options = {
CONF_BACKUP_PATH: backup_path,
CONF_BACKUP_SHARE: backup_share,
}
if otp_code:
config_data[CONF_DEVICE_TOKEN] = api.device_token
if user_input.get(CONF_DISKS):
@ -233,10 +260,12 @@ class SynologyDSMFlowHandler(ConfigFlow, domain=DOMAIN):
"reauth_successful" if self.reauth_conf else "reconfigure_successful"
)
return self.async_update_reload_and_abort(
existing_entry, data=config_data, reason=reason
existing_entry, data=config_data, options=config_options, reason=reason
)
return self.async_create_entry(title=friendly_name or host, data=config_data)
return self.async_create_entry(
title=friendly_name or host, data=config_data, options=config_options
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
@ -368,6 +397,43 @@ class SynologyDSMFlowHandler(ConfigFlow, domain=DOMAIN):
return await self.async_step_user(user_input)
async def async_step_backup_share(
self, user_input: dict[str, Any], errors: dict[str, str] | None = None
) -> ConfigFlowResult:
"""Select backup location."""
if TYPE_CHECKING:
assert self.shares is not None
if not self.saved_user_input:
self.saved_user_input = user_input
if CONF_BACKUP_PATH not in user_input and CONF_BACKUP_SHARE not in user_input:
return self.async_show_form(
step_id="backup_share",
data_schema=vol.Schema(
{
vol.Required(CONF_BACKUP_SHARE): SelectSelector(
SelectSelectorConfig(
options=[
SelectOptionDict(value=s.path, label=s.name)
for s in self.shares
],
mode=SelectSelectorMode.DROPDOWN,
),
),
vol.Required(
CONF_BACKUP_PATH,
default=f"{DEFAULT_BACKUP_PATH}_{slugify(self.hass.config.location_name)}",
): str,
}
),
)
user_input = {**self.saved_user_input, **user_input}
self.saved_user_input = {}
return await self.async_step_user(user_input)
def _async_get_existing_entry(self, discovered_mac: str) -> ConfigEntry | None:
"""See if we already have a configured NAS with this MAC address."""
for entry in self._async_current_entries():
@ -388,6 +454,8 @@ class SynologyDSMOptionsFlowHandler(OptionsFlow):
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
syno_data: SynologyDSMData = self.hass.data[DOMAIN][self.config_entry.unique_id]
data_schema = vol.Schema(
{
vol.Required(
@ -404,6 +472,36 @@ class SynologyDSMOptionsFlowHandler(OptionsFlow):
): vol.All(vol.Coerce(int), vol.Range(min=0, max=2)),
}
)
shares: list[SynoFileSharedFolder] | None = None
if syno_data.api.file_station:
with suppress(*SYNOLOGY_CONNECTION_EXCEPTIONS):
shares = await syno_data.api.file_station.get_shared_folders(
only_writable=True
)
if shares:
data_schema = data_schema.extend(
{
vol.Required(
CONF_BACKUP_SHARE,
default=self.config_entry.options[CONF_BACKUP_SHARE],
): SelectSelector(
SelectSelectorConfig(
options=[
SelectOptionDict(value=s.path, label=s.name)
for s in shares
],
mode=SelectSelectorMode.DROPDOWN,
),
),
vol.Required(
CONF_BACKUP_PATH,
default=self.config_entry.options[CONF_BACKUP_PATH],
): str,
}
)
return self.async_show_form(step_id="init", data_schema=data_schema)

View File

@ -2,6 +2,8 @@
from __future__ import annotations
from collections.abc import Callable
from aiohttp import ClientTimeout
from synology_dsm.api.surveillance_station.const import SNAPSHOT_PROFILE_BALANCED
from synology_dsm.exceptions import (
@ -15,8 +17,12 @@ from synology_dsm.exceptions import (
)
from homeassistant.const import Platform
from homeassistant.util.hass_dict import HassKey
DOMAIN = "synology_dsm"
DATA_BACKUP_AGENT_LISTENERS: HassKey[list[Callable[[], None]]] = HassKey(
f"{DOMAIN}_backup_agent_listeners"
)
ATTRIBUTION = "Data provided by Synology"
PLATFORMS = [
Platform.BINARY_SENSOR,
@ -34,6 +40,8 @@ CONF_SERIAL = "serial"
CONF_VOLUMES = "volumes"
CONF_DEVICE_TOKEN = "device_token"
CONF_SNAPSHOT_QUALITY = "snap_profile_type"
CONF_BACKUP_SHARE = "backup_share"
CONF_BACKUP_PATH = "backup_path"
DEFAULT_USE_SSL = True
DEFAULT_VERIFY_SSL = False
@ -43,6 +51,7 @@ DEFAULT_PORT_SSL = 5001
DEFAULT_SCAN_INTERVAL = 15 # min
DEFAULT_TIMEOUT = ClientTimeout(total=60, connect=15)
DEFAULT_SNAPSHOT_QUALITY = SNAPSHOT_PROFILE_BALANCED
DEFAULT_BACKUP_PATH = "ha_backup"
ENTITY_UNIT_LOAD = "load"

View File

@ -21,6 +21,17 @@
"otp_code": "Code"
}
},
"backup_share": {
"title": "Synology DSM: Backup location",
"data": {
"backup_share": "Shared folder",
"backup_path": "Path"
},
"data_description": {
"backup_share": "Select the shared folder, where the automatic Home-Assistant backup should be stored.",
"backup_path": "Define the path on the selected shared folder (will automatically be created, if not exist)."
}
},
"link": {
"description": "Do you want to set up {name} ({host})?",
"data": {

View File

@ -34,5 +34,5 @@ def fixture_dsm():
dsm.network.update = AsyncMock(return_value=True)
dsm.surveillance_station.update = AsyncMock(return_value=True)
dsm.upgrade.update = AsyncMock(return_value=True)
dsm.file = AsyncMock(get_shared_folders=AsyncMock(return_value=None))
return dsm

View File

@ -84,3 +84,17 @@
'verify_ssl': False,
})
# ---
# name: test_user_with_filestation
dict({
'host': 'nas.meontheinternet.com',
'mac': list([
'00-11-32-XX-XX-59',
'00-11-32-XX-XX-5A',
]),
'password': 'password',
'port': 1234,
'ssl': True,
'username': 'Home_Assistant',
'verify_ssl': False,
})
# ---

View File

@ -0,0 +1,709 @@
"""Tests for the Synology DSM backup agent."""
from io import StringIO
from typing import Any
from unittest.mock import AsyncMock, MagicMock, Mock, patch
import pytest
from synology_dsm.api.file_station.models import SynoFileFile, SynoFileSharedFolder
from synology_dsm.exceptions import SynologyDSMAPIErrorException
from homeassistant.components.backup import (
DOMAIN as BACKUP_DOMAIN,
AddonInfo,
AgentBackup,
Folder,
)
from homeassistant.components.synology_dsm.const import (
CONF_BACKUP_PATH,
CONF_BACKUP_SHARE,
DOMAIN,
)
from homeassistant.const import (
CONF_HOST,
CONF_MAC,
CONF_PASSWORD,
CONF_PORT,
CONF_SSL,
CONF_USERNAME,
)
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from homeassistant.util.aiohttp import MockStreamReader
from .consts import HOST, MACS, PASSWORD, PORT, SERIAL, USE_SSL, USERNAME
from tests.common import MockConfigEntry
from tests.typing import ClientSessionGenerator, WebSocketGenerator
class MockStreamReaderChunked(MockStreamReader):
"""Mock a stream reader with simulated chunked data."""
async def readchunk(self) -> tuple[bytes, bool]:
"""Read bytes."""
return (self._content.read(), False)
async def _mock_download_file(path: str, filename: str) -> MockStreamReader:
if filename == "abcd12ef_meta.json":
return MockStreamReader(
b'{"addons":[],"backup_id":"abcd12ef","date":"2025-01-09T20:14:35.457323+01:00",'
b'"database_included":true,"extra_metadata":{"instance_id":"36b3b7e984da43fc89f7bafb2645fa36",'
b'"with_automatic_settings":true},"folders":[],"homeassistant_included":true,'
b'"homeassistant_version":"2025.2.0.dev0","name":"Automatic backup 2025.2.0.dev0","protected":true,"size":13916160}'
)
if filename == "abcd12ef.tar":
return MockStreamReaderChunked(b"backup data")
raise MockStreamReaderChunked(b"")
async def _mock_download_file_meta_ok_tar_missing(
path: str, filename: str
) -> MockStreamReader:
if filename == "abcd12ef_meta.json":
return MockStreamReader(
b'{"addons":[],"backup_id":"abcd12ef","date":"2025-01-09T20:14:35.457323+01:00",'
b'"database_included":true,"extra_metadata":{"instance_id":"36b3b7e984da43fc89f7bafb2645fa36",'
b'"with_automatic_settings":true},"folders":[],"homeassistant_included":true,'
b'"homeassistant_version":"2025.2.0.dev0","name":"Automatic backup 2025.2.0.dev0","protected":true,"size":13916160}'
)
if filename == "abcd12ef.tar":
raise SynologyDSMAPIErrorException("api", "404", "not found")
raise MockStreamReaderChunked(b"")
async def _mock_download_file_meta_defect(path: str, filename: str) -> MockStreamReader:
if filename == "abcd12ef_meta.json":
return MockStreamReader(b"im not a json")
if filename == "abcd12ef.tar":
return MockStreamReaderChunked(b"backup data")
raise MockStreamReaderChunked(b"")
@pytest.fixture
def mock_dsm_with_filestation():
"""Mock a successful service with filestation support."""
with patch("homeassistant.components.synology_dsm.common.SynologyDSM") as dsm:
dsm.login = AsyncMock(return_value=True)
dsm.update = AsyncMock(return_value=True)
dsm.surveillance_station.update = AsyncMock(return_value=True)
dsm.upgrade.update = AsyncMock(return_value=True)
dsm.utilisation = Mock(cpu_user_load=1, update=AsyncMock(return_value=True))
dsm.network = Mock(update=AsyncMock(return_value=True), macs=MACS)
dsm.storage = Mock(
disks_ids=["sda", "sdb", "sdc"],
volumes_ids=["volume_1"],
update=AsyncMock(return_value=True),
)
dsm.information = Mock(serial=SERIAL)
dsm.file = AsyncMock(
get_shared_folders=AsyncMock(
return_value=[
SynoFileSharedFolder(
additional=None,
is_dir=True,
name="HA Backup",
path="/ha_backup",
)
]
),
get_files=AsyncMock(
return_value=[
SynoFileFile(
additional=None,
is_dir=False,
name="abcd12ef_meta.json",
path="/ha_backup/my_backup_path/abcd12ef_meta.json",
),
SynoFileFile(
additional=None,
is_dir=False,
name="abcd12ef.tar",
path="/ha_backup/my_backup_path/abcd12ef.tar",
),
]
),
download_file=_mock_download_file,
upload_file=AsyncMock(return_value=True),
delete_file=AsyncMock(return_value=True),
)
dsm.logout = AsyncMock(return_value=True)
yield dsm
@pytest.fixture
def mock_dsm_without_filestation():
"""Mock a successful service with filestation support."""
with patch("homeassistant.components.synology_dsm.common.SynologyDSM") as dsm:
dsm.login = AsyncMock(return_value=True)
dsm.update = AsyncMock(return_value=True)
dsm.surveillance_station.update = AsyncMock(return_value=True)
dsm.upgrade.update = AsyncMock(return_value=True)
dsm.utilisation = Mock(cpu_user_load=1, update=AsyncMock(return_value=True))
dsm.network = Mock(update=AsyncMock(return_value=True), macs=MACS)
dsm.storage = Mock(
disks_ids=["sda", "sdb", "sdc"],
volumes_ids=["volume_1"],
update=AsyncMock(return_value=True),
)
dsm.information = Mock(serial=SERIAL)
dsm.file = None
yield dsm
@pytest.fixture
async def setup_dsm_with_filestation(
hass: HomeAssistant,
mock_dsm_with_filestation: MagicMock,
):
"""Mock setup of synology dsm config entry."""
with (
patch(
"homeassistant.components.synology_dsm.common.SynologyDSM",
return_value=mock_dsm_with_filestation,
),
patch("homeassistant.components.synology_dsm.PLATFORMS", return_value=[]),
):
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: HOST,
CONF_PORT: PORT,
CONF_SSL: USE_SSL,
CONF_USERNAME: USERNAME,
CONF_PASSWORD: PASSWORD,
CONF_MAC: MACS[0],
},
options={
CONF_BACKUP_PATH: "my_backup_path",
CONF_BACKUP_SHARE: "/ha_backup",
},
unique_id="mocked_syno_dsm_entry",
)
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
assert await async_setup_component(hass, BACKUP_DOMAIN, {BACKUP_DOMAIN: {}})
await hass.async_block_till_done()
yield mock_dsm_with_filestation
async def test_agents_info(
hass: HomeAssistant,
setup_dsm_with_filestation: MagicMock,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test backup agent info."""
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/agents/info"})
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"agents": [
{"agent_id": "synology_dsm.Mock Title"},
{"agent_id": "backup.local"},
],
}
async def test_agents_not_loaded(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test backup agent with no loaded config entry."""
with patch("homeassistant.components.backup.is_hassio", return_value=False):
assert await async_setup_component(hass, BACKUP_DOMAIN, {BACKUP_DOMAIN: {}})
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
await hass.async_block_till_done()
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/agents/info"})
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"agents": [
{"agent_id": "backup.local"},
],
}
async def test_agents_on_unload(
hass: HomeAssistant,
setup_dsm_with_filestation: MagicMock,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test backup agent on un-loading config entry."""
# config entry is loaded
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/agents/info"})
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"agents": [
{"agent_id": "synology_dsm.Mock Title"},
{"agent_id": "backup.local"},
],
}
# unload config entry
entries = hass.config_entries.async_loaded_entries(DOMAIN)
await hass.config_entries.async_unload(entries[0].entry_id)
await hass.async_block_till_done(wait_background_tasks=True)
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/agents/info"})
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"agents": [
{"agent_id": "backup.local"},
],
}
async def test_agents_list_backups(
hass: HomeAssistant,
setup_dsm_with_filestation: MagicMock,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test agent list backups."""
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/info"})
response = await client.receive_json()
assert response["success"]
assert response["result"]["agent_errors"] == {}
assert response["result"]["backups"] == [
{
"addons": [],
"backup_id": "abcd12ef",
"date": "2025-01-09T20:14:35.457323+01:00",
"database_included": True,
"folders": [],
"homeassistant_included": True,
"homeassistant_version": "2025.2.0.dev0",
"name": "Automatic backup 2025.2.0.dev0",
"protected": True,
"size": 13916160,
"agent_ids": ["synology_dsm.Mock Title"],
"failed_agent_ids": [],
"with_automatic_settings": None,
}
]
async def test_agents_list_backups_error(
hass: HomeAssistant,
setup_dsm_with_filestation: MagicMock,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test agent error while list backups."""
client = await hass_ws_client(hass)
setup_dsm_with_filestation.file.get_files.side_effect = (
SynologyDSMAPIErrorException("api", "500", "error")
)
await client.send_json_auto_id({"type": "backup/info"})
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"agent_errors": {"synology_dsm.Mock Title": "Failed to list backups"},
"backups": [],
"last_attempted_automatic_backup": None,
"last_completed_automatic_backup": None,
"next_automatic_backup": None,
"next_automatic_backup_additional": False,
}
async def test_agents_list_backups_disabled_filestation(
hass: HomeAssistant,
mock_dsm_without_filestation: MagicMock,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test agent error while list backups when file station is disabled."""
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/info"})
response = await client.receive_json()
assert not response["success"]
@pytest.mark.parametrize(
("backup_id", "expected_result"),
[
(
"abcd12ef",
{
"addons": [],
"backup_id": "abcd12ef",
"date": "2025-01-09T20:14:35.457323+01:00",
"database_included": True,
"folders": [],
"homeassistant_included": True,
"homeassistant_version": "2025.2.0.dev0",
"name": "Automatic backup 2025.2.0.dev0",
"protected": True,
"size": 13916160,
"agent_ids": ["synology_dsm.Mock Title"],
"failed_agent_ids": [],
"with_automatic_settings": None,
},
),
(
"12345",
None,
),
],
ids=["found", "not_found"],
)
async def test_agents_get_backup(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
setup_dsm_with_filestation: MagicMock,
backup_id: str,
expected_result: dict[str, Any] | None,
) -> None:
"""Test agent get backup."""
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/details", "backup_id": backup_id})
response = await client.receive_json()
assert response["success"]
assert response["result"]["agent_errors"] == {}
assert response["result"]["backup"] == expected_result
async def test_agents_get_backup_not_existing(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
setup_dsm_with_filestation: MagicMock,
) -> None:
"""Test agent get not existing backup."""
client = await hass_ws_client(hass)
backup_id = "ef34ab12"
setup_dsm_with_filestation.file.download_file = AsyncMock(
side_effect=SynologyDSMAPIErrorException("api", "404", "not found")
)
await client.send_json_auto_id({"type": "backup/details", "backup_id": backup_id})
response = await client.receive_json()
assert response["success"]
assert response["result"] == {"agent_errors": {}, "backup": None}
async def test_agents_get_backup_error(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
setup_dsm_with_filestation: MagicMock,
) -> None:
"""Test agent error while get backup."""
client = await hass_ws_client(hass)
backup_id = "ef34ab12"
setup_dsm_with_filestation.file.get_files.side_effect = (
SynologyDSMAPIErrorException("api", "500", "error")
)
await client.send_json_auto_id({"type": "backup/details", "backup_id": backup_id})
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"agent_errors": {"synology_dsm.Mock Title": "Failed to list backups"},
"backup": None,
}
async def test_agents_get_backup_defect_meta(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
setup_dsm_with_filestation: MagicMock,
) -> None:
"""Test agent error while get backup."""
client = await hass_ws_client(hass)
backup_id = "ef34ab12"
setup_dsm_with_filestation.file.download_file = _mock_download_file_meta_defect
await client.send_json_auto_id({"type": "backup/details", "backup_id": backup_id})
response = await client.receive_json()
assert response["success"]
assert response["result"] == {"agent_errors": {}, "backup": None}
async def test_agents_download(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
setup_dsm_with_filestation: MagicMock,
) -> None:
"""Test agent download backup."""
client = await hass_client()
backup_id = "abcd12ef"
resp = await client.get(
f"/api/backup/download/{backup_id}?agent_id=synology_dsm.Mock Title"
)
assert resp.status == 200
assert await resp.content.read() == b"backup data"
async def test_agents_download_not_existing(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
setup_dsm_with_filestation: MagicMock,
) -> None:
"""Test agent download not existing backup."""
client = await hass_client()
backup_id = "abcd12ef"
setup_dsm_with_filestation.file.download_file = (
_mock_download_file_meta_ok_tar_missing
)
resp = await client.get(
f"/api/backup/download/{backup_id}?agent_id=synology_dsm.Mock Title"
)
assert resp.reason == "Internal Server Error"
assert resp.status == 500
async def test_agents_upload(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
caplog: pytest.LogCaptureFixture,
setup_dsm_with_filestation: MagicMock,
) -> None:
"""Test agent upload backup."""
client = await hass_client()
backup_id = "test-backup"
test_backup = AgentBackup(
addons=[AddonInfo(name="Test", slug="test", version="1.0.0")],
backup_id=backup_id,
database_included=True,
date="1970-01-01T00:00:00.000Z",
extra_metadata={},
folders=[Folder.MEDIA, Folder.SHARE],
homeassistant_included=True,
homeassistant_version="2024.12.0",
name="Test",
protected=True,
size=0,
)
with (
patch(
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
) as fetch_backup,
patch(
"homeassistant.components.backup.manager.read_backup",
return_value=test_backup,
),
patch("pathlib.Path.open") as mocked_open,
):
mocked_open.return_value.read = Mock(side_effect=[b"test", b""])
fetch_backup.return_value = test_backup
resp = await client.post(
"/api/backup/upload?agent_id=synology_dsm.Mock Title",
data={"file": StringIO("test")},
)
assert resp.status == 201
assert f"Uploading backup {backup_id}" in caplog.text
mock: AsyncMock = setup_dsm_with_filestation.file.upload_file
assert len(mock.mock_calls) == 2
assert mock.call_args_list[0].kwargs["filename"] == "test-backup.tar"
assert mock.call_args_list[0].kwargs["path"] == "/ha_backup/my_backup_path"
assert mock.call_args_list[1].kwargs["filename"] == "test-backup_meta.json"
assert mock.call_args_list[1].kwargs["path"] == "/ha_backup/my_backup_path"
async def test_agents_upload_error(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
caplog: pytest.LogCaptureFixture,
setup_dsm_with_filestation: MagicMock,
) -> None:
"""Test agent error while uploading backup."""
client = await hass_client()
backup_id = "test-backup"
test_backup = AgentBackup(
addons=[AddonInfo(name="Test", slug="test", version="1.0.0")],
backup_id=backup_id,
database_included=True,
date="1970-01-01T00:00:00.000Z",
extra_metadata={},
folders=[Folder.MEDIA, Folder.SHARE],
homeassistant_included=True,
homeassistant_version="2024.12.0",
name="Test",
protected=True,
size=0,
)
# fail to upload the tar file
with (
patch(
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
) as fetch_backup,
patch(
"homeassistant.components.backup.manager.read_backup",
return_value=test_backup,
),
patch("pathlib.Path.open") as mocked_open,
):
mocked_open.return_value.read = Mock(side_effect=[b"test", b""])
fetch_backup.return_value = test_backup
setup_dsm_with_filestation.file.upload_file.side_effect = (
SynologyDSMAPIErrorException("api", "500", "error")
)
resp = await client.post(
"/api/backup/upload?agent_id=synology_dsm.Mock Title",
data={"file": StringIO("test")},
)
assert resp.status == 201
assert f"Uploading backup {backup_id}" in caplog.text
assert "Failed to upload backup" in caplog.text
mock: AsyncMock = setup_dsm_with_filestation.file.upload_file
assert len(mock.mock_calls) == 1
assert mock.call_args_list[0].kwargs["filename"] == "test-backup.tar"
assert mock.call_args_list[0].kwargs["path"] == "/ha_backup/my_backup_path"
# fail to upload the meta json file
with (
patch(
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
) as fetch_backup,
patch(
"homeassistant.components.backup.manager.read_backup",
return_value=test_backup,
),
patch("pathlib.Path.open") as mocked_open,
):
mocked_open.return_value.read = Mock(side_effect=[b"test", b""])
fetch_backup.return_value = test_backup
setup_dsm_with_filestation.file.upload_file.side_effect = [
True,
SynologyDSMAPIErrorException("api", "500", "error"),
]
resp = await client.post(
"/api/backup/upload?agent_id=synology_dsm.Mock Title",
data={"file": StringIO("test")},
)
assert resp.status == 201
assert f"Uploading backup {backup_id}" in caplog.text
assert "Failed to upload backup" in caplog.text
mock: AsyncMock = setup_dsm_with_filestation.file.upload_file
assert len(mock.mock_calls) == 3
assert mock.call_args_list[1].kwargs["filename"] == "test-backup.tar"
assert mock.call_args_list[1].kwargs["path"] == "/ha_backup/my_backup_path"
assert mock.call_args_list[2].kwargs["filename"] == "test-backup_meta.json"
assert mock.call_args_list[2].kwargs["path"] == "/ha_backup/my_backup_path"
async def test_agents_delete(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
setup_dsm_with_filestation: MagicMock,
) -> None:
"""Test agent delete backup."""
client = await hass_ws_client(hass)
backup_id = "abcd12ef"
await client.send_json_auto_id(
{
"type": "backup/delete",
"backup_id": backup_id,
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {"agent_errors": {}}
mock: AsyncMock = setup_dsm_with_filestation.file.delete_file
assert len(mock.mock_calls) == 2
assert mock.call_args_list[0].kwargs["filename"] == "abcd12ef.tar"
assert mock.call_args_list[0].kwargs["path"] == "/ha_backup/my_backup_path"
assert mock.call_args_list[1].kwargs["filename"] == "abcd12ef_meta.json"
assert mock.call_args_list[1].kwargs["path"] == "/ha_backup/my_backup_path"
async def test_agents_delete_not_existing(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
setup_dsm_with_filestation: MagicMock,
) -> None:
"""Test delete not existing backup."""
client = await hass_ws_client(hass)
backup_id = "ef34ab12"
setup_dsm_with_filestation.file.delete_file = AsyncMock(
side_effect=SynologyDSMAPIErrorException("api", "404", "not found")
)
await client.send_json_auto_id(
{
"type": "backup/delete",
"backup_id": backup_id,
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"agent_errors": {"synology_dsm.Mock Title": "Failed to delete the backup"}
}
async def test_agents_delete_error(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
setup_dsm_with_filestation: MagicMock,
) -> None:
"""Test error while delete backup."""
client = await hass_ws_client(hass)
# error while delete
backup_id = "abcd12ef"
setup_dsm_with_filestation.file.delete_file.side_effect = (
SynologyDSMAPIErrorException("api", "404", "not found")
)
await client.send_json_auto_id(
{
"type": "backup/delete",
"backup_id": backup_id,
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"agent_errors": {"synology_dsm.Mock Title": "Failed to delete the backup"}
}
mock: AsyncMock = setup_dsm_with_filestation.file.delete_file
assert len(mock.mock_calls) == 1
assert mock.call_args_list[0].kwargs["filename"] == "abcd12ef.tar"
assert mock.call_args_list[0].kwargs["path"] == "/ha_backup/my_backup_path"

View File

@ -4,6 +4,7 @@ from ipaddress import ip_address
from unittest.mock import AsyncMock, MagicMock, Mock, patch
import pytest
from synology_dsm.api.file_station.models import SynoFileSharedFolder
from synology_dsm.exceptions import (
SynologyDSMException,
SynologyDSMLogin2SAFailedException,
@ -15,9 +16,9 @@ from syrupy import SnapshotAssertion
from homeassistant.components.synology_dsm.config_flow import CONF_OTP_CODE
from homeassistant.components.synology_dsm.const import (
CONF_BACKUP_PATH,
CONF_BACKUP_SHARE,
CONF_SNAPSHOT_QUALITY,
DEFAULT_SCAN_INTERVAL,
DEFAULT_SNAPSHOT_QUALITY,
DOMAIN,
)
from homeassistant.config_entries import SOURCE_SSDP, SOURCE_USER, SOURCE_ZEROCONF
@ -73,7 +74,7 @@ def mock_controller_service():
update=AsyncMock(return_value=True),
)
dsm.information = Mock(serial=SERIAL)
dsm.file = AsyncMock(get_shared_folders=AsyncMock(return_value=None))
yield dsm
@ -96,6 +97,7 @@ def mock_controller_service_2sa():
update=AsyncMock(return_value=True),
)
dsm.information = Mock(serial=SERIAL)
dsm.file = AsyncMock(get_shared_folders=AsyncMock(return_value=None))
yield dsm
@ -116,6 +118,39 @@ def mock_controller_service_vdsm():
update=AsyncMock(return_value=True),
)
dsm.information = Mock(serial=SERIAL)
dsm.file = AsyncMock(get_shared_folders=AsyncMock(return_value=None))
yield dsm
@pytest.fixture(name="service_with_filestation")
def mock_controller_service_with_filestation():
"""Mock a successful service with filestation support."""
with patch("homeassistant.components.synology_dsm.config_flow.SynologyDSM") as dsm:
dsm.login = AsyncMock(return_value=True)
dsm.update = AsyncMock(return_value=True)
dsm.surveillance_station.update = AsyncMock(return_value=True)
dsm.upgrade.update = AsyncMock(return_value=True)
dsm.utilisation = Mock(cpu_user_load=1, update=AsyncMock(return_value=True))
dsm.network = Mock(update=AsyncMock(return_value=True), macs=MACS)
dsm.storage = Mock(
disks_ids=["sda", "sdb", "sdc"],
volumes_ids=["volume_1"],
update=AsyncMock(return_value=True),
)
dsm.information = Mock(serial=SERIAL)
dsm.file = AsyncMock(
get_shared_folders=AsyncMock(
return_value=[
SynoFileSharedFolder(
additional=None,
is_dir=True,
name="HA Backup",
path="/ha_backup",
)
]
)
)
yield dsm
@ -137,7 +172,7 @@ def mock_controller_service_failed():
update=AsyncMock(return_value=True),
)
dsm.information = Mock(serial=None)
dsm.file = AsyncMock(get_shared_folders=AsyncMock(return_value=None))
yield dsm
@ -283,6 +318,55 @@ async def test_user_vdsm(
assert result["data"] == snapshot
@pytest.mark.usefixtures("mock_setup_entry")
async def test_user_with_filestation(
hass: HomeAssistant,
service_with_filestation: MagicMock,
snapshot: SnapshotAssertion,
) -> None:
"""Test user config."""
with patch(
"homeassistant.components.synology_dsm.config_flow.SynologyDSM",
return_value=service_with_filestation,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}, data=None
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
with patch(
"homeassistant.components.synology_dsm.config_flow.SynologyDSM",
return_value=service_with_filestation,
):
# test with all provided
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
data={
CONF_HOST: HOST,
CONF_PORT: PORT,
CONF_SSL: USE_SSL,
CONF_VERIFY_SSL: VERIFY_SSL,
CONF_USERNAME: USERNAME,
CONF_PASSWORD: PASSWORD,
},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "backup_share"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_BACKUP_SHARE: "/ha_backup", CONF_BACKUP_PATH: "automatic_ha_backups"},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["result"].unique_id == SERIAL
assert result["title"] == HOST
assert result["data"] == snapshot
@pytest.mark.usefixtures("mock_setup_entry")
async def test_reauth(hass: HomeAssistant, service: MagicMock) -> None:
"""Test reauthentication."""
@ -560,46 +644,54 @@ async def test_existing_ssdp(hass: HomeAssistant, service: MagicMock) -> None:
assert result["reason"] == "already_configured"
@pytest.mark.usefixtures("mock_setup_entry")
async def test_options_flow(hass: HomeAssistant, service: MagicMock) -> None:
async def test_options_flow(
hass: HomeAssistant, service_with_filestation: MagicMock
) -> None:
"""Test config flow options."""
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: HOST,
CONF_USERNAME: USERNAME,
CONF_PASSWORD: PASSWORD,
CONF_MAC: MACS,
},
unique_id=SERIAL,
)
config_entry.add_to_hass(hass)
with (
patch(
"homeassistant.components.synology_dsm.common.SynologyDSM",
return_value=service_with_filestation,
),
patch("homeassistant.components.synology_dsm.PLATFORMS", return_value=[]),
):
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: HOST,
CONF_PORT: PORT,
CONF_SSL: USE_SSL,
CONF_USERNAME: USERNAME,
CONF_PASSWORD: PASSWORD,
CONF_MAC: MACS[0],
},
unique_id=SERIAL,
)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert config_entry.options == {}
assert config_entry.options == {CONF_BACKUP_SHARE: None, CONF_BACKUP_PATH: None}
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "init"
# Scan interval
# Default
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert config_entry.options[CONF_SCAN_INTERVAL] == DEFAULT_SCAN_INTERVAL
assert config_entry.options[CONF_SNAPSHOT_QUALITY] == DEFAULT_SNAPSHOT_QUALITY
# Manual
result = await hass.config_entries.options.async_init(config_entry.entry_id)
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={CONF_SCAN_INTERVAL: 2, CONF_SNAPSHOT_QUALITY: 0},
user_input={
CONF_SCAN_INTERVAL: 2,
CONF_SNAPSHOT_QUALITY: 0,
CONF_BACKUP_PATH: "my_nackup_path",
CONF_BACKUP_SHARE: "/ha_backup",
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert config_entry.options[CONF_SCAN_INTERVAL] == 2
assert config_entry.options[CONF_SNAPSHOT_QUALITY] == 0
assert config_entry.options[CONF_BACKUP_PATH] == "my_nackup_path"
assert config_entry.options[CONF_BACKUP_SHARE] == "/ha_backup"
@pytest.mark.usefixtures("mock_setup_entry")

View File

@ -4,7 +4,13 @@ from unittest.mock import MagicMock, patch
from synology_dsm.exceptions import SynologyDSMLoginInvalidException
from homeassistant.components.synology_dsm.const import DOMAIN, SERVICES
from homeassistant.components.synology_dsm.const import (
CONF_BACKUP_PATH,
CONF_BACKUP_SHARE,
DEFAULT_VERIFY_SSL,
DOMAIN,
SERVICES,
)
from homeassistant.const import (
CONF_HOST,
CONF_MAC,
@ -12,6 +18,7 @@ from homeassistant.const import (
CONF_PORT,
CONF_SSL,
CONF_USERNAME,
CONF_VERIFY_SSL,
)
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
@ -78,3 +85,38 @@ async def test_reauth_triggered(hass: HomeAssistant) -> None:
assert not await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
mock_async_step_reauth.assert_called_once()
async def test_config_entry_migrations(
hass: HomeAssistant, mock_dsm: MagicMock
) -> None:
"""Test if reauthentication flow is triggered."""
with (
patch(
"homeassistant.components.synology_dsm.common.SynologyDSM",
return_value=mock_dsm,
),
patch("homeassistant.components.synology_dsm.PLATFORMS", return_value=[]),
):
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: HOST,
CONF_PORT: PORT,
CONF_SSL: USE_SSL,
CONF_USERNAME: USERNAME,
CONF_PASSWORD: PASSWORD,
CONF_MAC: MACS[0],
},
)
entry.add_to_hass(hass)
assert CONF_VERIFY_SSL not in entry.data
assert CONF_BACKUP_SHARE not in entry.options
assert CONF_BACKUP_PATH not in entry.options
assert await hass.config_entries.async_setup(entry.entry_id)
assert entry.data[CONF_VERIFY_SSL] == DEFAULT_VERIFY_SSL
assert entry.options[CONF_BACKUP_SHARE] is None
assert entry.options[CONF_BACKUP_PATH] is None

View File

@ -62,6 +62,7 @@ def dsm_with_photos() -> MagicMock:
dsm.photos.get_item_thumbnail_url = AsyncMock(
return_value="http://my.thumbnail.url"
)
dsm.file = AsyncMock(get_shared_folders=AsyncMock(return_value=None))
return dsm