"""Provide persistent configuration for the backup integration.""" from __future__ import annotations from dataclasses import dataclass, field, replace import datetime as dt from datetime import datetime, timedelta from enum import StrEnum import random from typing import TYPE_CHECKING, Self, TypedDict from cronsim import CronSim from homeassistant.core import HomeAssistant, callback from homeassistant.helpers.event import async_call_later, async_track_point_in_time from homeassistant.helpers.typing import UNDEFINED, UndefinedType from homeassistant.util import dt as dt_util from .const import LOGGER from .models import BackupManagerError, Folder if TYPE_CHECKING: from .manager import BackupManager, ManagerBackup CRON_PATTERN_DAILY = "{m} {h} * * *" CRON_PATTERN_WEEKLY = "{m} {h} * * {d}" # The default time for automatic backups to run is at 04:45. # This time is chosen to be compatible with the time of the recorder's # nightly job which runs at 04:12. DEFAULT_BACKUP_TIME = dt.time(4, 45) # Randomize the start time of the backup by up to 60 minutes to avoid # all backups running at the same time. BACKUP_START_TIME_JITTER = 60 * 60 class StoredBackupConfig(TypedDict): """Represent the stored backup config.""" agents: dict[str, StoredAgentConfig] create_backup: StoredCreateBackupConfig last_attempted_automatic_backup: str | None last_completed_automatic_backup: str | None retention: StoredRetentionConfig schedule: StoredBackupSchedule @dataclass(kw_only=True) class BackupConfigData: """Represent loaded backup config data.""" agents: dict[str, AgentConfig] create_backup: CreateBackupConfig last_attempted_automatic_backup: datetime | None = None last_completed_automatic_backup: datetime | None = None retention: RetentionConfig schedule: BackupSchedule @classmethod def from_dict(cls, data: StoredBackupConfig) -> Self: """Initialize backup config data from a dict.""" include_folders_data = data["create_backup"]["include_folders"] if include_folders_data: include_folders = [Folder(folder) for folder in include_folders_data] else: include_folders = None retention = data["retention"] if last_attempted_str := data["last_attempted_automatic_backup"]: last_attempted = dt_util.parse_datetime(last_attempted_str) else: last_attempted = None if last_attempted_str := data["last_completed_automatic_backup"]: last_completed = dt_util.parse_datetime(last_attempted_str) else: last_completed = None if time_str := data["schedule"]["time"]: time = dt_util.parse_time(time_str) else: time = None days = [Day(day) for day in data["schedule"]["days"]] return cls( agents={ agent_id: AgentConfig(protected=agent_data["protected"]) for agent_id, agent_data in data["agents"].items() }, create_backup=CreateBackupConfig( agent_ids=data["create_backup"]["agent_ids"], include_addons=data["create_backup"]["include_addons"], include_all_addons=data["create_backup"]["include_all_addons"], include_database=data["create_backup"]["include_database"], include_folders=include_folders, name=data["create_backup"]["name"], password=data["create_backup"]["password"], ), last_attempted_automatic_backup=last_attempted, last_completed_automatic_backup=last_completed, retention=RetentionConfig( copies=retention["copies"], days=retention["days"], ), schedule=BackupSchedule( days=days, recurrence=ScheduleRecurrence(data["schedule"]["recurrence"]), state=ScheduleState(data["schedule"].get("state", ScheduleState.NEVER)), time=time, ), ) def to_dict(self) -> StoredBackupConfig: """Convert backup config data to a dict.""" if self.last_attempted_automatic_backup: last_attempted = self.last_attempted_automatic_backup.isoformat() else: last_attempted = None if self.last_completed_automatic_backup: last_completed = self.last_completed_automatic_backup.isoformat() else: last_completed = None return StoredBackupConfig( agents={ agent_id: agent.to_dict() for agent_id, agent in self.agents.items() }, create_backup=self.create_backup.to_dict(), last_attempted_automatic_backup=last_attempted, last_completed_automatic_backup=last_completed, retention=self.retention.to_dict(), schedule=self.schedule.to_dict(), ) class BackupConfig: """Handle backup config.""" def __init__(self, hass: HomeAssistant, manager: BackupManager) -> None: """Initialize backup config.""" self.data = BackupConfigData( agents={}, create_backup=CreateBackupConfig(), retention=RetentionConfig(), schedule=BackupSchedule(), ) self._manager = manager def load(self, stored_config: StoredBackupConfig) -> None: """Load config.""" self.data = BackupConfigData.from_dict(stored_config) self.data.retention.apply(self._manager) self.data.schedule.apply(self._manager) async def update( self, *, agents: dict[str, AgentParametersDict] | UndefinedType = UNDEFINED, create_backup: CreateBackupParametersDict | UndefinedType = UNDEFINED, retention: RetentionParametersDict | UndefinedType = UNDEFINED, schedule: ScheduleParametersDict | UndefinedType = UNDEFINED, ) -> None: """Update config.""" if agents is not UNDEFINED: for agent_id, agent_config in agents.items(): if agent_id not in self.data.agents: self.data.agents[agent_id] = AgentConfig(**agent_config) else: self.data.agents[agent_id] = replace( self.data.agents[agent_id], **agent_config ) if create_backup is not UNDEFINED: self.data.create_backup = replace(self.data.create_backup, **create_backup) if retention is not UNDEFINED: new_retention = RetentionConfig(**retention) if new_retention != self.data.retention: self.data.retention = new_retention self.data.retention.apply(self._manager) if schedule is not UNDEFINED: new_schedule = BackupSchedule(**schedule) if new_schedule.to_dict() != self.data.schedule.to_dict(): self.data.schedule = new_schedule self.data.schedule.apply(self._manager) self._manager.store.save() @dataclass(kw_only=True) class AgentConfig: """Represent the config for an agent.""" protected: bool def to_dict(self) -> StoredAgentConfig: """Convert agent config to a dict.""" return { "protected": self.protected, } class StoredAgentConfig(TypedDict): """Represent the stored config for an agent.""" protected: bool class AgentParametersDict(TypedDict, total=False): """Represent the parameters for an agent.""" protected: bool @dataclass(kw_only=True) class RetentionConfig: """Represent the backup retention configuration.""" copies: int | None = None days: int | None = None def apply(self, manager: BackupManager) -> None: """Apply backup retention configuration.""" if self.days is not None: LOGGER.debug( "Scheduling next automatic delete of backups older than %s in 1 day", self.days, ) self._schedule_next(manager) else: LOGGER.debug("Unscheduling next automatic delete") self._unschedule_next(manager) def to_dict(self) -> StoredRetentionConfig: """Convert backup retention configuration to a dict.""" return StoredRetentionConfig( copies=self.copies, days=self.days, ) @callback def _schedule_next( self, manager: BackupManager, ) -> None: """Schedule the next delete after days.""" self._unschedule_next(manager) async def _delete_backups(now: datetime) -> None: """Delete backups older than days.""" self._schedule_next(manager) def _delete_filter( backups: dict[str, ManagerBackup], ) -> dict[str, ManagerBackup]: """Return backups older than days to delete.""" # we need to check here since we await before # this filter is applied if self.days is None: return {} now = dt_util.utcnow() return { backup_id: backup for backup_id, backup in backups.items() if dt_util.parse_datetime(backup.date, raise_on_error=True) + timedelta(days=self.days) < now } await manager.async_delete_filtered_backups( include_filter=_automatic_backups_filter, delete_filter=_delete_filter ) manager.remove_next_delete_event = async_call_later( manager.hass, timedelta(days=1), _delete_backups ) @callback def _unschedule_next(self, manager: BackupManager) -> None: """Unschedule the next delete after days.""" if (remove_next_event := manager.remove_next_delete_event) is not None: remove_next_event() manager.remove_next_delete_event = None class StoredRetentionConfig(TypedDict): """Represent the stored backup retention configuration.""" copies: int | None days: int | None class RetentionParametersDict(TypedDict, total=False): """Represent the parameters for retention.""" copies: int | None days: int | None class StoredBackupSchedule(TypedDict): """Represent the stored backup schedule configuration.""" days: list[Day] recurrence: ScheduleRecurrence state: ScheduleState time: str | None class ScheduleParametersDict(TypedDict, total=False): """Represent parameters for backup schedule.""" days: list[Day] recurrence: ScheduleRecurrence state: ScheduleState time: dt.time | None class Day(StrEnum): """Represent the day(s) in a custom schedule recurrence.""" MONDAY = "mon" TUESDAY = "tue" WEDNESDAY = "wed" THURSDAY = "thu" FRIDAY = "fri" SATURDAY = "sat" SUNDAY = "sun" class ScheduleRecurrence(StrEnum): """Represent the schedule recurrence.""" NEVER = "never" DAILY = "daily" CUSTOM_DAYS = "custom_days" class ScheduleState(StrEnum): """Represent the schedule recurrence. This is deprecated and can be remove in HA Core 2025.8. """ NEVER = "never" DAILY = "daily" MONDAY = "mon" TUESDAY = "tue" WEDNESDAY = "wed" THURSDAY = "thu" FRIDAY = "fri" SATURDAY = "sat" SUNDAY = "sun" @dataclass(kw_only=True) class BackupSchedule: """Represent the backup schedule.""" days: list[Day] = field(default_factory=list) recurrence: ScheduleRecurrence = ScheduleRecurrence.NEVER # Although no longer used, state is kept for backwards compatibility. # It can be removed in HA Core 2025.8. state: ScheduleState = ScheduleState.NEVER time: dt.time | None = None cron_event: CronSim | None = field(init=False, default=None) next_automatic_backup: datetime | None = field(init=False, default=None) next_automatic_backup_additional = False @callback def apply( self, manager: BackupManager, ) -> None: """Apply a new schedule. There are only three possible recurrence types: never, daily, or custom_days """ if self.recurrence is ScheduleRecurrence.NEVER or ( self.recurrence is ScheduleRecurrence.CUSTOM_DAYS and not self.days ): self._unschedule_next(manager) return time = self.time if self.time is not None else DEFAULT_BACKUP_TIME if self.recurrence is ScheduleRecurrence.DAILY: self._schedule_next( CRON_PATTERN_DAILY.format(m=time.minute, h=time.hour), manager, ) else: # ScheduleRecurrence.CUSTOM_DAYS self._schedule_next( CRON_PATTERN_WEEKLY.format( m=time.minute, h=time.hour, d=",".join(day.value for day in self.days), ), manager, ) @callback def _schedule_next( self, cron_pattern: str, manager: BackupManager, ) -> None: """Schedule the next backup.""" self._unschedule_next(manager) now = dt_util.now() if (cron_event := self.cron_event) is None: seed_time = manager.config.data.last_completed_automatic_backup or now cron_event = self.cron_event = CronSim(cron_pattern, seed_time) next_time = next(cron_event) if next_time < now: # schedule a backup at next daily time once # if we missed the last scheduled backup time = self.time if self.time is not None else DEFAULT_BACKUP_TIME cron_event = CronSim( CRON_PATTERN_DAILY.format(m=time.minute, h=time.hour), now ) next_time = next(cron_event) # reseed the cron event attribute # add a day to the next time to avoid scheduling at the same time again self.cron_event = CronSim(cron_pattern, now + timedelta(days=1)) # Compare the computed next time with the next time from the cron pattern # to determine if an additional backup has been scheduled cron_event_configured = CronSim(cron_pattern, now) next_configured_time = next(cron_event_configured) self.next_automatic_backup_additional = next_time < next_configured_time else: self.next_automatic_backup_additional = False async def _create_backup(now: datetime) -> None: """Create backup.""" manager.remove_next_backup_event = None self._schedule_next(cron_pattern, manager) # create the backup try: await manager.async_create_automatic_backup() except BackupManagerError as err: LOGGER.error("Error creating backup: %s", err) except Exception: # noqa: BLE001 LOGGER.exception("Unexpected error creating automatic backup") if self.time is None: # randomize the start time of the backup by up to 60 minutes if the time is # not set to avoid all backups running at the same time next_time += timedelta(seconds=random.randint(0, BACKUP_START_TIME_JITTER)) LOGGER.debug("Scheduling next automatic backup at %s", next_time) self.next_automatic_backup = next_time manager.remove_next_backup_event = async_track_point_in_time( manager.hass, _create_backup, next_time ) def to_dict(self) -> StoredBackupSchedule: """Convert backup schedule to a dict.""" return StoredBackupSchedule( days=self.days, recurrence=self.recurrence, state=self.state, time=self.time.isoformat() if self.time else None, ) @callback def _unschedule_next(self, manager: BackupManager) -> None: """Unschedule the next backup.""" self.next_automatic_backup = None if (remove_next_event := manager.remove_next_backup_event) is not None: remove_next_event() manager.remove_next_backup_event = None @dataclass(kw_only=True) class CreateBackupConfig: """Represent the config for async_create_backup.""" agent_ids: list[str] = field(default_factory=list) include_addons: list[str] | None = None include_all_addons: bool = False include_database: bool = True include_folders: list[Folder] | None = None name: str | None = None password: str | None = None def to_dict(self) -> StoredCreateBackupConfig: """Convert create backup config to a dict.""" return { "agent_ids": self.agent_ids, "include_addons": self.include_addons, "include_all_addons": self.include_all_addons, "include_database": self.include_database, "include_folders": self.include_folders, "name": self.name, "password": self.password, } class StoredCreateBackupConfig(TypedDict): """Represent the stored config for async_create_backup.""" agent_ids: list[str] include_addons: list[str] | None include_all_addons: bool include_database: bool include_folders: list[Folder] | None name: str | None password: str | None class CreateBackupParametersDict(TypedDict, total=False): """Represent the parameters for async_create_backup.""" agent_ids: list[str] include_addons: list[str] | None include_all_addons: bool include_database: bool include_folders: list[Folder] | None name: str | None password: str | None def _automatic_backups_filter( backups: dict[str, ManagerBackup], ) -> dict[str, ManagerBackup]: """Return automatic backups.""" return { backup_id: backup for backup_id, backup in backups.items() if backup.with_automatic_settings } async def delete_backups_exceeding_configured_count(manager: BackupManager) -> None: """Delete backups exceeding the configured retention count.""" def _delete_filter( backups: dict[str, ManagerBackup], ) -> dict[str, ManagerBackup]: """Return oldest backups more numerous than copies to delete.""" # we need to check here since we await before # this filter is applied if manager.config.data.retention.copies is None: return {} return dict( sorted( backups.items(), key=lambda backup_item: backup_item[1].date, )[: max(len(backups) - manager.config.data.retention.copies, 0)] ) await manager.async_delete_filtered_backups( include_filter=_automatic_backups_filter, delete_filter=_delete_filter )