supervisor/tests/misc/test_scheduler.py
Stefan Agner f6faa18409
Bump pre-commit ruff to 0.5.7 and reformat (#5242)
It seems that the codebase is not formatted with the latest ruff
version. This PR reformats the codebase with ruff 0.5.7.
2024-08-13 20:53:56 +02:00

78 lines
1.8 KiB
Python

"""Test Supervisor scheduler backend."""
import asyncio
from supervisor.const import CoreState
async def test_simple_task(coresys):
"""Schedule a simple task."""
coresys.core.state = CoreState.RUNNING
trigger = []
async def test_task():
"""Test task for schedule."""
trigger.append(True)
coresys.scheduler.register_task(test_task, 0.1, False)
await asyncio.sleep(0.3)
assert len(trigger) == 1
async def test_simple_task_repeat(coresys):
"""Schedule a simple task and repeat."""
coresys.core.state = CoreState.RUNNING
trigger = []
async def test_task():
"""Test task for schedule."""
trigger.append(True)
coresys.scheduler.register_task(test_task, 0.1, True)
for _ in range(5):
await asyncio.sleep(0.1)
if len(trigger) > 1:
break
assert len(trigger) > 1
async def test_simple_task_shutdown(coresys):
"""Schedule a simple task with shudown."""
coresys.core.state = CoreState.RUNNING
trigger = []
async def test_task():
"""Test task for schedule."""
trigger.append(True)
coresys.scheduler.register_task(test_task, 0.1, True)
await asyncio.sleep(0.3)
await coresys.scheduler.shutdown()
assert len(trigger) > 1
old = len(trigger)
await asyncio.sleep(0.2)
assert len(trigger) == old
async def test_simple_task_repeat_block(coresys):
"""Schedule a simple task with repeat and block."""
coresys.core.state = CoreState.RUNNING
trigger = []
async def test_task():
"""Test task for schedule."""
trigger.append(True)
await asyncio.sleep(2)
coresys.scheduler.register_task(test_task, 0.1, True)
await asyncio.sleep(0.3)
assert len(trigger) == 1
await coresys.scheduler.shutdown()