From 0a2227b460af52eaa5eb75b6ea66952854f2477a Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Wed, 30 Jul 2025 14:17:29 +0200 Subject: [PATCH] Introduce pytest for new job decorator combinations --- tests/jobs/test_job_decorator.py | 92 +++++++++++++++++++++++++++++++- 1 file changed, 91 insertions(+), 1 deletion(-) diff --git a/tests/jobs/test_job_decorator.py b/tests/jobs/test_job_decorator.py index e8f6be3b9..91b634af3 100644 --- a/tests/jobs/test_job_decorator.py +++ b/tests/jobs/test_job_decorator.py @@ -20,7 +20,7 @@ from supervisor.exceptions import ( from supervisor.host.const import HostFeature from supervisor.host.manager import HostManager from supervisor.jobs import JobSchedulerOptions, SupervisorJob -from supervisor.jobs.const import JobExecutionLimit +from supervisor.jobs.const import JobConcurrency, JobExecutionLimit, JobThrottle from supervisor.jobs.decorator import Job, JobCondition from supervisor.jobs.job_group import JobGroup from supervisor.os.manager import OSManager @@ -1212,3 +1212,93 @@ async def test_job_scheduled_at(coresys: CoreSys): assert job.name == "test_job_scheduled_at_job_task" assert job.stage == "work" assert job.parent_id is None + + +async def test_concurency_reject_and_throttle(coresys: CoreSys): + """Test the concurrency rejct and throttle job execution limit.""" + + class TestClass: + """Test class.""" + + def __init__(self, coresys: CoreSys): + """Initialize the test class.""" + self.coresys = coresys + self.run = asyncio.Lock() + self.call = 0 + + @Job( + name="test_concurency_reject_and_throttle_execute", + concurrency=JobConcurrency.REJECT, + throttle=JobThrottle.THROTTLE, + throttle_period=timedelta(hours=1), + ) + async def execute(self, sleep: float): + """Execute the class method.""" + assert not self.run.locked() + async with self.run: + await asyncio.sleep(sleep) + self.call += 1 + + test = TestClass(coresys) + + results = await asyncio.gather( + *[test.execute(0.1), test.execute(0.1), test.execute(0.1)], + return_exceptions=True, + ) + assert results[0] is None + assert isinstance(results[1], JobException) + assert isinstance(results[2], JobException) + assert test.call == 1 + + await asyncio.gather(*[test.execute(0.1)]) + assert test.call == 1 + + +@pytest.mark.parametrize("error", [None, PluginJobError]) +async def test_concurency_reject_and_rate_limit( + coresys: CoreSys, error: JobException | None +): + """Test the concurrency rejct and rate limit job execution limit.""" + + class TestClass: + """Test class.""" + + def __init__(self, coresys: CoreSys): + """Initialize the test class.""" + self.coresys = coresys + self.run = asyncio.Lock() + self.call = 0 + + @Job( + name=f"test_concurency_reject_and_rate_limit_execute_{uuid4().hex}", + concurrency=JobConcurrency.REJECT, + throttle=JobThrottle.RATE_LIMIT, + throttle_period=timedelta(hours=1), + throttle_max_calls=1, + on_condition=error, + ) + async def execute(self, sleep: float = 0): + """Execute the class method.""" + async with self.run: + await asyncio.sleep(sleep) + self.call += 1 + + test = TestClass(coresys) + + results = await asyncio.gather( + *[test.execute(0.1), test.execute(), test.execute()], return_exceptions=True + ) + assert results[0] is None + assert isinstance(results[1], JobException) + assert isinstance(results[2], JobException) + assert test.call == 1 + + with pytest.raises(JobException if error is None else error): + await test.execute() + + assert test.call == 1 + + with time_machine.travel(utcnow() + timedelta(hours=1)): + await test.execute() + + assert test.call == 2