mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-04-23 20:57:15 +00:00

* Support dynamic device access cgroup * Clean listener better * Update supervisor/docker/addon.py Co-authored-by: Stefan Agner <stefan@agner.ch> * Update addon.py * Fix black Co-authored-by: Stefan Agner <stefan@agner.ch>
71 lines
1.8 KiB
Python
71 lines
1.8 KiB
Python
"""Test bus backend."""
|
|
|
|
import asyncio
|
|
|
|
import pytest
|
|
|
|
from supervisor.const import BusEvent
|
|
from supervisor.coresys import CoreSys
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bus_event(coresys: CoreSys) -> None:
|
|
"""Test bus events over the backend."""
|
|
results = []
|
|
|
|
async def callback(data) -> None:
|
|
"""Test callback."""
|
|
results.append(data)
|
|
|
|
coresys.bus.register_event(BusEvent.HARDWARE_NEW_DEVICE, callback)
|
|
|
|
coresys.bus.fire_event(BusEvent.HARDWARE_NEW_DEVICE, None)
|
|
await asyncio.sleep(0)
|
|
assert results[-1] is None
|
|
|
|
coresys.bus.fire_event(BusEvent.HARDWARE_NEW_DEVICE, "test")
|
|
await asyncio.sleep(0)
|
|
assert results[-1] == "test"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bus_event_not_called(coresys: CoreSys) -> None:
|
|
"""Test bus events over the backend."""
|
|
results = []
|
|
|
|
async def callback(data) -> None:
|
|
"""Test callback."""
|
|
results.append(data)
|
|
|
|
coresys.bus.register_event(BusEvent.HARDWARE_NEW_DEVICE, callback)
|
|
|
|
coresys.bus.fire_event(BusEvent.HARDWARE_REMOVE_DEVICE, None)
|
|
await asyncio.sleep(0)
|
|
assert len(results) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bus_event_removed(coresys: CoreSys) -> None:
|
|
"""Test bus events over the backend and remove."""
|
|
results = []
|
|
|
|
async def callback(data) -> None:
|
|
"""Test callback."""
|
|
results.append(data)
|
|
|
|
listener = coresys.bus.register_event(BusEvent.HARDWARE_NEW_DEVICE, callback)
|
|
|
|
coresys.bus.fire_event(BusEvent.HARDWARE_NEW_DEVICE, None)
|
|
await asyncio.sleep(0)
|
|
assert results[-1] is None
|
|
|
|
coresys.bus.fire_event(BusEvent.HARDWARE_NEW_DEVICE, "test")
|
|
await asyncio.sleep(0)
|
|
assert results[-1] == "test"
|
|
|
|
coresys.bus.remove_listener(listener)
|
|
|
|
coresys.bus.fire_event(BusEvent.HARDWARE_NEW_DEVICE, None)
|
|
await asyncio.sleep(0)
|
|
assert results[-1] == "test"
|