Add progress syncing from child jobs (#6207)

* Add progress syncing from child jobs

* Fix pylint issue

* Set initial progress from parent and end at 100
This commit is contained in:
Mike Degatano
2025-09-30 14:52:16 -04:00
committed by GitHub
parent ab3b147876
commit 64f94a159c
6 changed files with 215 additions and 35 deletions

View File

@@ -1,5 +1,7 @@
"""Supervisor job manager."""
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine, Generator
from contextlib import contextmanager, suppress
@@ -10,6 +12,7 @@ import logging
from typing import Any, Self
from uuid import uuid4
from attr.validators import gt, lt
from attrs import Attribute, define, field
from attrs.setters import convert as attr_convert, frozen, validate as attr_validate
from attrs.validators import ge, le
@@ -47,13 +50,13 @@ def _remove_current_job(context: Context) -> Context:
return context
def _invalid_if_done(instance: "SupervisorJob", *_) -> None:
def _invalid_if_done(instance: SupervisorJob, *_) -> None:
"""Validate that job is not done."""
if instance.done:
raise ValueError("Cannot update a job that is done")
def _on_change(instance: "SupervisorJob", attribute: Attribute, value: Any) -> Any:
def _on_change(instance: SupervisorJob, attribute: Attribute, value: Any) -> Any:
"""Forward a change to a field on to the listener if defined."""
value = attr_convert(instance, attribute, value)
value = attr_validate(instance, attribute, value)
@@ -62,12 +65,34 @@ def _on_change(instance: "SupervisorJob", attribute: Attribute, value: Any) -> A
return value
def _invalid_if_started(instance: "SupervisorJob", *_) -> None:
def _invalid_if_started(instance: SupervisorJob, *_) -> None:
"""Validate that job has not been started."""
if instance.done is not None:
raise ValueError("Field cannot be updated once job has started")
@define(frozen=True)
class ChildJobSyncFilter:
"""Filter to identify a child job to sync progress from."""
name: str
reference: str | None | type[DEFAULT] = DEFAULT
progress_allocation: float = field(default=1.0, validator=[gt(0.0), le(1.0)])
def matches(self, job: SupervisorJob) -> bool:
"""Return true if job matches filter."""
return job.name == self.name and self.reference in (DEFAULT, job.reference)
@define(frozen=True)
class ParentJobSync:
"""Parent job sync details."""
uuid: str
starting_progress: float = field(validator=[ge(0.0), lt(100.0)])
progress_allocation: float = field(validator=[gt(0.0), le(1.0)])
@define
class SupervisorJobError:
"""Representation of an error occurring during a supervisor job."""
@@ -103,13 +128,15 @@ class SupervisorJob:
)
parent_id: str | None = field(factory=_CURRENT_JOB.get, on_setattr=frozen)
done: bool | None = field(init=False, default=None, on_setattr=_on_change)
on_change: Callable[["SupervisorJob", Attribute, Any], None] | None = None
on_change: Callable[[SupervisorJob, Attribute, Any], None] | None = None
internal: bool = field(default=False)
errors: list[SupervisorJobError] = field(
init=False, factory=list, on_setattr=_on_change
)
release_event: asyncio.Event | None = None
extra: dict[str, Any] | None = None
child_job_syncs: list[ChildJobSyncFilter] | None = None
parent_job_syncs: list[ParentJobSync] = field(init=False, factory=list)
def as_dict(self) -> dict[str, Any]:
"""Return dictionary representation."""
@@ -152,8 +179,14 @@ class SupervisorJob:
try:
token = _CURRENT_JOB.set(self.uuid)
yield self
# Cannot have an else without an except so we do nothing and re-raise
except: # noqa: TRY203
raise
else:
self.update(progress=100, done=True)
finally:
self.done = True
if not self.done:
self.done = True
if token:
_CURRENT_JOB.reset(token)
@@ -174,12 +207,14 @@ class SupervisorJob:
self.stage = stage
if extra != DEFAULT:
self.extra = extra
# Done has special event. use that to trigger on change if included
# If not then just use any other field to trigger
self.on_change = on_change
if done is not None:
self.done = done
self.on_change = on_change
# Just triggers the normal on change
self.reference = self.reference
else:
self.reference = self.reference
class JobManager(FileConfiguration, CoreSysAttributes):
@@ -225,16 +260,35 @@ class JobManager(FileConfiguration, CoreSysAttributes):
"""Return true if there is an active job for the current asyncio task."""
return _CURRENT_JOB.get() is not None
def _notify_on_job_change(
def _on_job_change(
self, job: SupervisorJob, attribute: Attribute, value: Any
) -> None:
"""Notify Home Assistant of a change to a job and bus on job start/end."""
"""Take on change actions such as notify home assistant and sync progress."""
# Job object will be before the change. Combine the change with current data
if attribute.name == "errors":
value = [err.as_dict() for err in value]
job_data = job.as_dict() | {attribute.name: value}
self.sys_homeassistant.websocket.supervisor_event(
WSEvent.JOB, job.as_dict() | {attribute.name: value}
)
# Notify Home Assistant of change if its not internal
if not job.internal:
self.sys_homeassistant.websocket.supervisor_event(WSEvent.JOB, job_data)
# If we have any parent job syncs, sync progress to them
for sync in job.parent_job_syncs:
try:
parent_job = self.get_job(sync.uuid)
except JobNotFound:
# Shouldn't happen but failure to find a parent for progress
# reporting shouldn't raise and break the active job
continue
progress = sync.starting_progress + (
sync.progress_allocation * job_data["progress"]
)
# Using max would always trigger on change even if progress was unchanged
# pylint: disable-next=R1731
if parent_job.progress < progress: # noqa: PLR1730
parent_job.progress = progress
if attribute.name == "done":
if value is False:
@@ -249,16 +303,41 @@ class JobManager(FileConfiguration, CoreSysAttributes):
initial_stage: str | None = None,
internal: bool = False,
parent_id: str | None = DEFAULT, # type: ignore
child_job_syncs: list[ChildJobSyncFilter] | None = None,
) -> SupervisorJob:
"""Create a new job."""
job = SupervisorJob(
name,
reference=reference,
stage=initial_stage,
on_change=None if internal else self._notify_on_job_change,
on_change=self._on_job_change,
internal=internal,
child_job_syncs=child_job_syncs,
**({} if parent_id == DEFAULT else {"parent_id": parent_id}), # type: ignore
)
# Shouldn't happen but inability to find a parent for progress reporting
# shouldn't raise and break the active job
with suppress(JobNotFound):
curr_parent = job
while curr_parent.parent_id:
curr_parent = self.get_job(curr_parent.parent_id)
if not curr_parent.child_job_syncs:
continue
# Break after first match at each parent as it doesn't make sense
# to match twice. But it could match multiple parents
for sync in curr_parent.child_job_syncs:
if sync.matches(job):
job.parent_job_syncs.append(
ParentJobSync(
curr_parent.uuid,
starting_progress=curr_parent.progress,
progress_allocation=sync.progress_allocation,
)
)
break
self._jobs[job.uuid] = job
return job

View File

@@ -24,7 +24,7 @@ from ..resolution.const import (
UnsupportedReason,
)
from ..utils.sentry import async_capture_exception
from . import SupervisorJob
from . import ChildJobSyncFilter, SupervisorJob
from .const import JobConcurrency, JobCondition, JobThrottle
from .job_group import JobGroup
@@ -48,6 +48,7 @@ class Job(CoreSysAttributes):
| None = None,
throttle_max_calls: int | None = None,
internal: bool = False,
child_job_syncs: list[ChildJobSyncFilter] | None = None,
): # pylint: disable=too-many-positional-arguments
"""Initialize the Job decorator.
@@ -61,6 +62,7 @@ class Job(CoreSysAttributes):
throttle_period (timedelta | Callable | None): Throttle period as a timedelta or a callable returning a timedelta (for throttled jobs).
throttle_max_calls (int | None): Maximum number of calls allowed within the throttle period (for rate-limited jobs).
internal (bool): Whether the job is internal (not exposed through the Supervisor API). Defaults to False.
child_job_syncs (list[ChildJobSyncFilter] | None): Use if jobs progress should be kept in sync with progress of one or more of its child jobs.ye
Raises:
RuntimeError: If job name is not unique, or required throttle parameters are missing for the selected throttle policy.
@@ -80,6 +82,7 @@ class Job(CoreSysAttributes):
self._last_call: dict[str | None, datetime] = {}
self._rate_limited_calls: dict[str | None, list[datetime]] | None = None
self._internal = internal
self._child_job_syncs = child_job_syncs
self.concurrency = concurrency
self.throttle = throttle
@@ -258,6 +261,7 @@ class Job(CoreSysAttributes):
job = _job__use_existing
job.name = self.name
job.internal = self._internal
job.child_job_syncs = self._child_job_syncs
if job_group:
job.reference = job_group.job_reference
else:
@@ -265,6 +269,7 @@ class Job(CoreSysAttributes):
self.name,
job_group.job_reference if job_group else None,
internal=self._internal,
child_job_syncs=self._child_job_syncs,
)
try:

View File

@@ -152,7 +152,7 @@ async def test_jobs_tree_representation(api_client: TestClient, coresys: CoreSys
"name": "test_jobs_tree_alt",
"reference": None,
"uuid": ANY,
"progress": 0,
"progress": 100,
"stage": "end",
"done": True,
"child_jobs": [],
@@ -282,7 +282,7 @@ async def test_jobs_sorted(api_client: TestClient, coresys: CoreSys):
"name": "test_jobs_sorted_2",
"reference": None,
"uuid": ANY,
"progress": 0,
"progress": 100,
"stage": None,
"done": True,
"errors": [],
@@ -294,7 +294,7 @@ async def test_jobs_sorted(api_client: TestClient, coresys: CoreSys):
"name": "test_jobs_sorted_1",
"reference": None,
"uuid": ANY,
"progress": 0,
"progress": 100,
"stage": None,
"done": True,
"errors": [],
@@ -305,7 +305,7 @@ async def test_jobs_sorted(api_client: TestClient, coresys: CoreSys):
"name": "test_jobs_sorted_inner_1",
"reference": None,
"uuid": ANY,
"progress": 0,
"progress": 100,
"stage": None,
"done": True,
"errors": [],
@@ -317,7 +317,7 @@ async def test_jobs_sorted(api_client: TestClient, coresys: CoreSys):
"name": "test_jobs_sorted_inner_2",
"reference": None,
"uuid": ANY,
"progress": 0,
"progress": 100,
"stage": None,
"done": True,
"errors": [],

View File

@@ -1110,6 +1110,7 @@ def _make_backup_message_for_assert(
reference: str,
stage: str | None,
done: bool = False,
progress: float = 0.0,
):
"""Make a backup message to use for assert test."""
return {
@@ -1120,7 +1121,7 @@ def _make_backup_message_for_assert(
"name": f"backup_manager_{action}",
"reference": reference,
"uuid": ANY,
"progress": 0,
"progress": progress,
"stage": stage,
"done": done,
"parent_id": None,
@@ -1132,13 +1133,12 @@ def _make_backup_message_for_assert(
}
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
async def test_backup_progress(
coresys: CoreSys,
install_addon_ssh: Addon,
container: MagicMock,
ha_ws_client: AsyncMock,
tmp_supervisor_data,
path_extern,
):
"""Test progress is tracked during backups."""
container.status = "running"
@@ -1182,7 +1182,10 @@ async def test_backup_progress(
reference=full_backup.slug, stage="await_addon_restarts"
),
_make_backup_message_for_assert(
reference=full_backup.slug, stage="await_addon_restarts", done=True
reference=full_backup.slug,
stage="await_addon_restarts",
done=True,
progress=100,
),
]
@@ -1227,18 +1230,17 @@ async def test_backup_progress(
reference=partial_backup.slug,
stage="finishing_file",
done=True,
progress=100,
),
]
@pytest.mark.usefixtures("supervisor_internet", "tmp_supervisor_data", "path_extern")
async def test_restore_progress(
coresys: CoreSys,
supervisor_internet,
install_addon_ssh: Addon,
container: MagicMock,
ha_ws_client: AsyncMock,
tmp_supervisor_data,
path_extern,
):
"""Test progress is tracked during backups."""
container.status = "running"
@@ -1320,6 +1322,7 @@ async def test_restore_progress(
reference=full_backup.slug,
stage="await_home_assistant_restart",
done=True,
progress=100,
),
]
@@ -1358,6 +1361,7 @@ async def test_restore_progress(
reference=folders_backup.slug,
stage="folders",
done=True,
progress=100,
),
]
@@ -1404,17 +1408,17 @@ async def test_restore_progress(
reference=addon_backup.slug,
stage="addons",
done=True,
progress=100,
),
]
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
async def test_freeze_thaw(
coresys: CoreSys,
install_addon_ssh: Addon,
container: MagicMock,
ha_ws_client: AsyncMock,
tmp_supervisor_data,
path_extern,
):
"""Test manual freeze and thaw for external snapshots."""
container.status = "running"
@@ -1460,7 +1464,11 @@ async def test_freeze_thaw(
action="thaw_all", reference=None, stage=None
),
_make_backup_message_for_assert(
action="freeze_all", reference=None, stage="addons", done=True
action="freeze_all",
reference=None,
stage="addons",
done=True,
progress=100,
),
]
@@ -1488,7 +1496,11 @@ async def test_freeze_thaw(
action="thaw_all", reference=None, stage="addons"
),
_make_backup_message_for_assert(
action="thaw_all", reference=None, stage="addons", done=True
action="thaw_all",
reference=None,
stage="addons",
done=True,
progress=100,
),
]

View File

@@ -19,7 +19,7 @@ from supervisor.exceptions import (
)
from supervisor.host.const import HostFeature
from supervisor.host.manager import HostManager
from supervisor.jobs import JobSchedulerOptions, SupervisorJob
from supervisor.jobs import ChildJobSyncFilter, JobSchedulerOptions, SupervisorJob
from supervisor.jobs.const import JobConcurrency, JobThrottle
from supervisor.jobs.decorator import Job, JobCondition
from supervisor.jobs.job_group import JobGroup
@@ -1003,7 +1003,7 @@ async def test_internal_jobs_no_notify(coresys: CoreSys, ha_ws_client: AsyncMock
"name": "test_internal_jobs_no_notify_default",
"reference": None,
"uuid": ANY,
"progress": 0,
"progress": 100,
"stage": None,
"done": True,
"parent_id": None,
@@ -1415,3 +1415,87 @@ async def test_core_supported(coresys: CoreSys, caplog: pytest.LogCaptureFixture
coresys.jobs.ignore_conditions = [JobCondition.HOME_ASSISTANT_CORE_SUPPORTED]
assert await test.execute()
async def test_progress_syncing(coresys: CoreSys):
"""Test progress syncing from child jobs to parent."""
group_child_event = asyncio.Event()
child_event = asyncio.Event()
execute_event = asyncio.Event()
main_event = asyncio.Event()
class TestClassGroup(JobGroup):
"""Test class group."""
def __init__(self, coresys: CoreSys) -> None:
super().__init__(coresys, "test_class_group", "test")
@Job(name="test_progress_syncing_group_child", internal=True)
async def test_progress_syncing_group_child(self):
"""Test progress syncing group child."""
coresys.jobs.current.progress = 50
main_event.set()
await group_child_event.wait()
coresys.jobs.current.progress = 100
class TestClass:
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
self.test_group = TestClassGroup(coresys)
@Job(
name="test_progress_syncing_execute",
child_job_syncs=[
ChildJobSyncFilter(
"test_progress_syncing_child_execute", progress_allocation=0.5
),
ChildJobSyncFilter(
"test_progress_syncing_group_child",
reference="test",
progress_allocation=0.5,
),
],
)
async def test_progress_syncing_execute(self):
"""Test progress syncing execute."""
await self.test_progress_syncing_child_execute()
await self.test_group.test_progress_syncing_group_child()
main_event.set()
await execute_event.wait()
@Job(name="test_progress_syncing_child_execute", internal=True)
async def test_progress_syncing_child_execute(self):
"""Test progress syncing child execute."""
coresys.jobs.current.progress = 50
main_event.set()
await child_event.wait()
coresys.jobs.current.progress = 100
test = TestClass(coresys)
job, task = coresys.jobs.schedule_job(
test.test_progress_syncing_execute, JobSchedulerOptions()
)
# First child should've set parent job to 25% progress
await main_event.wait()
assert job.progress == 25
# Now we run to middle of second job which should put us at 75%
main_event.clear()
child_event.set()
await main_event.wait()
assert job.progress == 75
# Finally let it run to the end and see progress is 100%
main_event.clear()
group_child_event.set()
await main_event.wait()
assert job.progress == 100
# Release and check it is done
execute_event.set()
await task
assert job.done

View File

@@ -219,7 +219,7 @@ async def test_notify_on_change(coresys: CoreSys, ha_ws_client: AsyncMock):
"name": TEST_JOB,
"reference": "test",
"uuid": ANY,
"progress": 50,
"progress": 100,
"stage": "test",
"done": True,
"parent_id": None,