Compare commits

..

2 Commits

Author SHA1 Message Date
Stefan Agner
230f359bf7 Support containerd snapshotter progress reporting 2025-11-05 15:09:17 +01:00
Stefan Agner
f996e60784 Fix docker image pull progress blocked by small layers
Small Docker layers (typically <100 bytes) can skip the downloading phase
entirely, going directly from "Pulling fs layer" to "Download complete"
without emitting any progress events with byte counts. This caused the
aggregate progress calculation to block indefinitely, as it required all
layer jobs to have their `extra` field populated with byte counts before
proceeding.

The issue manifested as parent job progress jumping from 0% to 97.9% after
long delays, as seen when a 96-byte layer held up progress reporting for
~50 seconds until it finally reached the "Extracting" phase.

Set a minimal `extra` field (current=1, total=1) when layers reach
"Download complete" without having gone through the downloading phase.
This allows the aggregate progress calculation to proceed immediately
while still correctly representing the layer as 100% downloaded.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 14:40:16 +01:00
3 changed files with 143 additions and 63 deletions

View File

@@ -4,7 +4,7 @@ atomicwrites-homeassistant==1.4.1
attrs==25.4.0
awesomeversion==25.8.0
blockbuster==1.5.25
brotli==1.2.0
brotli==1.1.0
ciso8601==2.3.3
colorlog==6.10.1
cpe==1.3.1

View File

@@ -341,23 +341,40 @@ class DockerInterface(JobGroup, ABC):
]
# First set the total bytes to be downloaded/extracted on the main job
# Note: With containerd snapshotter, total may be None (time-based progress instead of byte-based)
if not install_job.extra:
total = 0
has_byte_progress = True
for job in layer_jobs:
if not job.extra:
return
# If any layer has None for total, we can't do byte-weighted aggregation
if job.extra["total"] is None:
has_byte_progress = False
break
total += job.extra["total"]
install_job.extra = {"total": total}
# Store whether we have byte-based progress for later use
install_job.extra = {"total": total if has_byte_progress else None}
else:
total = install_job.extra["total"]
has_byte_progress = total is not None
# Then determine total progress based on progress of each sub-job, factoring in size of each compared to total
# Then determine total progress based on progress of each sub-job
# If we have byte counts, weight by size. Otherwise, simple average.
progress = 0.0
stage = PullImageLayerStage.PULL_COMPLETE
for job in layer_jobs:
if not job.extra:
return
progress += job.progress * (job.extra["total"] / total)
if has_byte_progress:
# Byte-weighted progress (classic Docker behavior)
progress += job.progress * (job.extra["total"] / total)
else:
# Simple average progress (containerd snapshotter with time-based progress)
progress += job.progress / len(layer_jobs)
job_stage = PullImageLayerStage.from_status(cast(str, job.stage))
if job_stage < PullImageLayerStage.EXTRACTING:

View File

@@ -576,78 +576,54 @@ async def test_install_progress_handles_layers_skipping_download(
test_docker_interface: DockerInterface,
capture_exception: Mock,
):
"""Test install handles small layers that skip downloading phase and go directly to download complete.
Reproduces the real-world scenario from Supervisor issue #6286:
- Small layer (02a6e69d8d00) completes Download complete at 10:14:08 without ever Downloading
- Normal layer (3f4a84073184) starts Downloading at 10:14:09 with progress updates
"""
"""Test install handles small layers that skip downloading phase and go directly to download complete."""
coresys.core.set_state(CoreState.RUNNING)
# Reproduce EXACT sequence from SupervisorNoUpdateProgressLogs.txt:
# Small layer (02a6e69d8d00) completes BEFORE normal layer (3f4a84073184) starts downloading
# Simulate multiple layers where one small layer (96 bytes) skips the downloading phase
# This layer should not block progress reporting for the parent job
coresys.docker.docker.api.pull.return_value = [
{"status": "Pulling from test/image", "id": "latest"},
# Small layer that skips downloading (02a6e69d8d00 in logs, 96 bytes)
{"status": "Pulling fs layer", "progressDetail": {}, "id": "02a6e69d8d00"},
{"status": "Pulling fs layer", "progressDetail": {}, "id": "3f4a84073184"},
{"status": "Waiting", "progressDetail": {}, "id": "02a6e69d8d00"},
{"status": "Waiting", "progressDetail": {}, "id": "3f4a84073184"},
# Goes straight to Download complete (10:14:08 in logs) - THIS IS THE KEY MOMENT
{"status": "Download complete", "progressDetail": {}, "id": "02a6e69d8d00"},
# Normal layer that downloads (3f4a84073184 in logs, 25MB)
# Downloading starts (10:14:09 in logs) - progress updates should happen NOW!
# Layer 1: Normal layer with downloading phase
{"status": "Pulling fs layer", "progressDetail": {}, "id": "layer1"},
{
"status": "Downloading",
"progressDetail": {"current": 260937, "total": 25371463},
"progress": "[> ] 260.9kB/25.37MB",
"id": "3f4a84073184",
"progressDetail": {"current": 100, "total": 1000},
"progress": "[=====> ] 100B/1000B",
"id": "layer1",
},
{
"status": "Downloading",
"progressDetail": {"current": 5505024, "total": 25371463},
"progress": "[==========> ] 5.505MB/25.37MB",
"id": "3f4a84073184",
"progressDetail": {"current": 1000, "total": 1000},
"progress": "[==================================================>] 1000B/1000B",
"id": "layer1",
},
{
"status": "Downloading",
"progressDetail": {"current": 11272192, "total": 25371463},
"progress": "[======================> ] 11.27MB/25.37MB",
"id": "3f4a84073184",
},
{"status": "Download complete", "progressDetail": {}, "id": "3f4a84073184"},
{"status": "Download complete", "progressDetail": {}, "id": "layer1"},
{
"status": "Extracting",
"progressDetail": {"current": 25371463, "total": 25371463},
"progress": "[==================================================>] 25.37MB/25.37MB",
"id": "3f4a84073184",
"progressDetail": {"current": 1000, "total": 1000},
"progress": "[==================================================>] 1000B/1000B",
"id": "layer1",
},
{"status": "Pull complete", "progressDetail": {}, "id": "3f4a84073184"},
# Small layer finally extracts (10:14:58 in logs)
{"status": "Pull complete", "progressDetail": {}, "id": "layer1"},
# Layer 2: Small layer that skips downloading (like 02a6e69d8d00 from the logs)
{"status": "Pulling fs layer", "progressDetail": {}, "id": "layer2"},
{"status": "Waiting", "progressDetail": {}, "id": "layer2"},
# Goes straight to Download complete without Downloading phase
{"status": "Download complete", "progressDetail": {}, "id": "layer2"},
{
"status": "Extracting",
"progressDetail": {"current": 96, "total": 96},
"progress": "[==================================================>] 96B/96B",
"id": "02a6e69d8d00",
"id": "layer2",
},
{"status": "Pull complete", "progressDetail": {}, "id": "02a6e69d8d00"},
{"status": "Pull complete", "progressDetail": {}, "id": "layer2"},
{"status": "Digest: sha256:test"},
{"status": "Status: Downloaded newer image for test/image:latest"},
]
# Capture immutable snapshots of install job progress using job.as_dict()
# This solves the mutable object problem - we snapshot state at call time
install_job_snapshots = []
original_on_job_change = coresys.jobs._on_job_change # pylint: disable=W0212
def capture_and_forward(job_obj, attribute, value):
# Capture immutable snapshot if this is the install job with progress
if job_obj.name == "docker_interface_install" and job_obj.progress > 0:
install_job_snapshots.append(job_obj.as_dict())
# Forward to original to maintain functionality
return original_on_job_change(job_obj, attribute, value)
with patch.object(coresys.jobs, "_on_job_change", side_effect=capture_and_forward):
with patch.object(
type(coresys.supervisor), "arch", PropertyMock(return_value="amd64")
):
# Schedule job so we can observe that it completes successfully
event = asyncio.Event()
job, install_task = coresys.jobs.schedule_job(
test_docker_interface.install,
@@ -665,13 +641,100 @@ async def test_install_progress_handles_layers_skipping_download(
await install_task
await event.wait()
# First update from layer download should have rather low progress ((260937/25445459) / 2 ~ 0.5%)
assert install_job_snapshots[0]["progress"] < 1
# The key assertion: Job should complete successfully without errors
# Without the fix, layer2 would block all progress reporting until it reached Extracting,
# preventing the aggregate progress calculation from running
assert job.done is True
assert job.progress == 100
capture_exception.assert_not_called()
# Total 8 events should lead to a progress update on the install job
assert len(install_job_snapshots) == 8
# Job should complete successfully
assert job.done is True
assert job.progress == 100
capture_exception.assert_not_called()
async def test_install_progress_handles_containerd_snapshotter(
coresys: CoreSys,
test_docker_interface: DockerInterface,
capture_exception: Mock,
):
"""Test install handles containerd snapshotter time-based progress (total=None)."""
coresys.core.set_state(CoreState.RUNNING)
# Containerd snapshotter reports extraction progress as time elapsed (e.g., "7 s")
# with current=7, total=None instead of byte-based progress
coresys.docker.docker.api.pull.return_value = [
{"status": "Pulling from test/image", "id": "latest"},
{"status": "Pulling fs layer", "progressDetail": {}, "id": "layer1"},
{
"status": "Downloading",
"progressDetail": {"current": 100, "total": 1000},
"progress": "[=====> ] 100B/1000B",
"id": "layer1",
},
{
"status": "Downloading",
"progressDetail": {"current": 1000, "total": 1000},
"progress": "[==================================================>] 1000B/1000B",
"id": "layer1",
},
{"status": "Download complete", "progressDetail": {}, "id": "layer1"},
{
"status": "Extracting",
"progressDetail": {"current": 1000, "total": 1000},
"progress": "[==================================================>] 1000B/1000B",
"id": "layer1",
},
{"status": "Pull complete", "progressDetail": {}, "id": "layer1"},
# Layer 2: Containerd snapshotter with time-based extraction
{"status": "Pulling fs layer", "progressDetail": {}, "id": "layer2"},
{
"status": "Downloading",
"progressDetail": {"current": 50, "total": 500},
"progress": "[=====> ] 50B/500B",
"id": "layer2",
},
{
"status": "Downloading",
"progressDetail": {"current": 500, "total": 500},
"progress": "[==================================================>] 500B/500B",
"id": "layer2",
},
{"status": "Download complete", "progressDetail": {}, "id": "layer2"},
# Time-based extraction progress (containerd snapshotter)
{
"status": "Extracting",
"progressDetail": {"current": 3, "total": None},
"progress": "3 s",
"id": "layer2",
},
{
"status": "Extracting",
"progressDetail": {"current": 7, "total": None},
"progress": "7 s",
"id": "layer2",
},
{"status": "Pull complete", "progressDetail": {}, "id": "layer2"},
{"status": "Digest: sha256:test"},
{"status": "Status: Downloaded newer image for test/image:latest"},
]
with patch.object(
type(coresys.supervisor), "arch", PropertyMock(return_value="amd64")
):
event = asyncio.Event()
job, install_task = coresys.jobs.schedule_job(
test_docker_interface.install,
JobSchedulerOptions(),
AwesomeVersion("1.2.3"),
"test",
)
async def listen_for_job_end(reference: SupervisorJob):
if reference.uuid != job.uuid:
return
event.set()
coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_END, listen_for_job_end)
await install_task
await event.wait()
# The key assertion: Job should complete without crashing on None total
assert job.done is True
assert job.progress == 100
capture_exception.assert_not_called()