Introduce pytest for new job decorator combinations

This commit is contained in:
Stefan Agner 2025-07-30 14:17:29 +02:00
parent 0c02518d1d
commit 0a2227b460
No known key found for this signature in database
GPG Key ID: AE01353D1E44747D

View File

@ -20,7 +20,7 @@ from supervisor.exceptions import (
from supervisor.host.const import HostFeature
from supervisor.host.manager import HostManager
from supervisor.jobs import JobSchedulerOptions, SupervisorJob
from supervisor.jobs.const import JobExecutionLimit
from supervisor.jobs.const import JobConcurrency, JobExecutionLimit, JobThrottle
from supervisor.jobs.decorator import Job, JobCondition
from supervisor.jobs.job_group import JobGroup
from supervisor.os.manager import OSManager
@ -1212,3 +1212,93 @@ async def test_job_scheduled_at(coresys: CoreSys):
assert job.name == "test_job_scheduled_at_job_task"
assert job.stage == "work"
assert job.parent_id is None
async def test_concurency_reject_and_throttle(coresys: CoreSys):
"""Test the concurrency rejct and throttle job execution limit."""
class TestClass:
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
self.run = asyncio.Lock()
self.call = 0
@Job(
name="test_concurency_reject_and_throttle_execute",
concurrency=JobConcurrency.REJECT,
throttle=JobThrottle.THROTTLE,
throttle_period=timedelta(hours=1),
)
async def execute(self, sleep: float):
"""Execute the class method."""
assert not self.run.locked()
async with self.run:
await asyncio.sleep(sleep)
self.call += 1
test = TestClass(coresys)
results = await asyncio.gather(
*[test.execute(0.1), test.execute(0.1), test.execute(0.1)],
return_exceptions=True,
)
assert results[0] is None
assert isinstance(results[1], JobException)
assert isinstance(results[2], JobException)
assert test.call == 1
await asyncio.gather(*[test.execute(0.1)])
assert test.call == 1
@pytest.mark.parametrize("error", [None, PluginJobError])
async def test_concurency_reject_and_rate_limit(
coresys: CoreSys, error: JobException | None
):
"""Test the concurrency rejct and rate limit job execution limit."""
class TestClass:
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
self.run = asyncio.Lock()
self.call = 0
@Job(
name=f"test_concurency_reject_and_rate_limit_execute_{uuid4().hex}",
concurrency=JobConcurrency.REJECT,
throttle=JobThrottle.RATE_LIMIT,
throttle_period=timedelta(hours=1),
throttle_max_calls=1,
on_condition=error,
)
async def execute(self, sleep: float = 0):
"""Execute the class method."""
async with self.run:
await asyncio.sleep(sleep)
self.call += 1
test = TestClass(coresys)
results = await asyncio.gather(
*[test.execute(0.1), test.execute(), test.execute()], return_exceptions=True
)
assert results[0] is None
assert isinstance(results[1], JobException)
assert isinstance(results[2], JobException)
assert test.call == 1
with pytest.raises(JobException if error is None else error):
await test.execute()
assert test.call == 1
with time_machine.travel(utcnow() + timedelta(hours=1)):
await test.execute()
assert test.call == 2