supervisor/tests/misc/test_scheduler.py
2020-08-27 10:33:35 +02:00

73 lines
1.7 KiB
Python

"""Test Supervisor scheduler backend."""
import asyncio
from supervisor.const import CoreState
async def test_simple_task(coresys):
"""Schedule a simple task."""
coresys.core.state = CoreState.RUNNING
trigger = []
async def test_task():
"""Test task for schedule."""
trigger.append(True)
coresys.scheduler.register_task(test_task, 0.1, False)
await asyncio.sleep(0.3)
assert len(trigger) == 1
async def test_simple_task_repeat(coresys):
"""Schedule a simple task and repeat."""
coresys.core.state = CoreState.RUNNING
trigger = []
async def test_task():
"""Test task for schedule."""
trigger.append(True)
coresys.scheduler.register_task(test_task, 0.1, True)
await asyncio.sleep(0.3)
assert len(trigger) > 1
async def test_simple_task_shutdown(coresys):
"""Schedule a simple task with shudown."""
coresys.core.state = CoreState.RUNNING
trigger = []
async def test_task():
"""Test task for schedule."""
trigger.append(True)
coresys.scheduler.register_task(test_task, 0.1, True)
await asyncio.sleep(0.3)
await coresys.scheduler.shutdown()
assert len(trigger) > 1
old = len(trigger)
await asyncio.sleep(0.2)
assert len(trigger) == old
async def test_simple_task_repeat_block(coresys):
"""Schedule a simple task with repeat and block."""
coresys.core.state = CoreState.RUNNING
trigger = []
async def test_task():
"""Test task for schedule."""
trigger.append(True)
await asyncio.sleep(2)
coresys.scheduler.register_task(test_task, 0.1, True)
await asyncio.sleep(0.3)
assert len(trigger) == 1
await coresys.scheduler.shutdown()