Add bus system for handling events hw/pulse/docker (#2999)

* Add bus system for handling events hw/pulse/docker

* give sound update back

* register events

* Add tests

* Add debug logger

* Update supervisor/coresys.py

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>
This commit is contained in:
Pascal Vizeli 2021-08-09 19:30:26 +02:00 committed by GitHub
parent 9638775944
commit 31001280c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 271 additions and 189 deletions

View File

@ -21,6 +21,7 @@ from .api import RestAPI
from .arch import CpuArch from .arch import CpuArch
from .auth import Auth from .auth import Auth
from .backups.manager import BackupManager from .backups.manager import BackupManager
from .bus import Bus
from .const import ( from .const import (
ENV_HOMEASSISTANT_REPOSITORY, ENV_HOMEASSISTANT_REPOSITORY,
ENV_SUPERVISOR_MACHINE, ENV_SUPERVISOR_MACHINE,
@ -39,7 +40,7 @@ from .discovery import Discovery
from .hardware.module import HardwareManager from .hardware.module import HardwareManager
from .hassos import HassOS from .hassos import HassOS
from .homeassistant.module import HomeAssistant from .homeassistant.module import HomeAssistant
from .host import HostManager from .host.manager import HostManager
from .ingress import Ingress from .ingress import Ingress
from .misc.filter import filter_data from .misc.filter import filter_data
from .misc.scheduler import Scheduler from .misc.scheduler import Scheduler
@ -83,6 +84,7 @@ async def initialize_coresys() -> CoreSys:
coresys.hassos = HassOS(coresys) coresys.hassos = HassOS(coresys)
coresys.scheduler = Scheduler(coresys) coresys.scheduler = Scheduler(coresys)
coresys.security = Security(coresys) coresys.security = Security(coresys)
coresys.bus = Bus(coresys)
# diagnostics # diagnostics
setup_diagnostics(coresys) setup_diagnostics(coresys)

43
supervisor/bus.py Normal file
View File

@ -0,0 +1,43 @@
"""Bus event system."""
from __future__ import annotations
import logging
from typing import Any, Awaitable, Callable, Dict, List
import attr
from .const import BusEvent
from .coresys import CoreSys, CoreSysAttributes
_LOGGER: logging.Logger = logging.getLogger(__name__)
class Bus(CoreSysAttributes):
"""Handle Bus event system."""
def __init__(self, coresys: CoreSys):
"""Initialize bus backend."""
self.coresys = coresys
self._listeners: Dict[BusEvent, List[EventListener]] = {}
def register_event(
self, event: BusEvent, callback: Callable[[Any], Awaitable[None]]
) -> EventListener:
"""Register callback for an event."""
listener = EventListener(event, callback)
self._listeners.setdefault(event, []).append(listener)
return listener
def fire_event(self, event: BusEvent, reference: Any) -> None:
"""Fire an event to the bus."""
_LOGGER.debug("Fire event '%s' with '%s'", event, reference)
for listener in self._listeners.get(event, []):
self.sys_create_task(listener.callback(reference))
@attr.s(slots=True, frozen=True)
class EventListener:
"""Event listener."""
event_type: BusEvent = attr.ib()
callback: Callable[[Any], Awaitable[None]] = attr.ib()

View File

@ -446,3 +446,10 @@ class HostFeature(str, Enum):
SERVICES = "services" SERVICES = "services"
SHUTDOWN = "shutdown" SHUTDOWN = "shutdown"
TIMEDATE = "timedate" TIMEDATE = "timedate"
class BusEvent(str, Enum):
"""Bus event type."""
HARDWARE_NEW_DEVICE = "hardware_new_device"
HARDWARE_REMOVE_DEVICE = "hardware_remove_device"

View File

@ -28,7 +28,7 @@ if TYPE_CHECKING:
from .hardware.module import HardwareManager from .hardware.module import HardwareManager
from .hassos import HassOS from .hassos import HassOS
from .homeassistant.module import HomeAssistant from .homeassistant.module import HomeAssistant
from .host import HostManager from .host.manager import HostManager
from .ingress import Ingress from .ingress import Ingress
from .jobs import JobManager from .jobs import JobManager
from .misc.scheduler import Scheduler from .misc.scheduler import Scheduler
@ -40,6 +40,7 @@ if TYPE_CHECKING:
from .store import StoreManager from .store import StoreManager
from .supervisor import Supervisor from .supervisor import Supervisor
from .updater import Updater from .updater import Updater
from .bus import Bus
T = TypeVar("T") T = TypeVar("T")
@ -88,6 +89,7 @@ class CoreSys:
self._resolution: Optional[ResolutionManager] = None self._resolution: Optional[ResolutionManager] = None
self._jobs: Optional[JobManager] = None self._jobs: Optional[JobManager] = None
self._security: Optional[Security] = None self._security: Optional[Security] = None
self._bus: Optional[Bus] = None
# Set default header for aiohttp # Set default header for aiohttp
self._websession._default_headers = MappingProxyType( self._websession._default_headers = MappingProxyType(
@ -352,6 +354,20 @@ class CoreSys:
raise RuntimeError("DBusManager already set!") raise RuntimeError("DBusManager already set!")
self._dbus = value self._dbus = value
@property
def bus(self) -> Bus:
"""Return Bus object."""
if self._bus is None:
raise RuntimeError("Bus not set!")
return self._bus
@bus.setter
def bus(self, value: Bus) -> None:
"""Set a Bus object."""
if self._bus:
raise RuntimeError("Bus already set!")
self._bus = value
@property @property
def host(self) -> HostManager: def host(self) -> HostManager:
"""Return HostManager object.""" """Return HostManager object."""
@ -540,9 +556,14 @@ class CoreSysAttributes:
@property @property
def sys_core(self) -> Core: def sys_core(self) -> Core:
"""Return core object.""" """Return Core object."""
return self.coresys.core return self.coresys.core
@property
def sys_bus(self) -> Bus:
"""Return Bus object."""
return self.coresys.bus
@property @property
def sys_plugins(self) -> PluginManager: def sys_plugins(self) -> PluginManager:
"""Return PluginManager object.""" """Return PluginManager object."""

View File

@ -7,11 +7,11 @@ from typing import Optional
import pyudev import pyudev
from ..const import CoreState from ..const import BusEvent
from ..coresys import CoreSys, CoreSysAttributes from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import HardwareNotFound from ..exceptions import HardwareNotFound
from ..resolution.const import UnhealthyReason from ..resolution.const import UnhealthyReason
from .const import HardwareAction, PolicyGroup, UdevKernelAction, UdevSubsystem from .const import HardwareAction, UdevKernelAction
from .data import Device from .data import Device
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
@ -70,8 +70,8 @@ class HwMonitor(CoreSysAttributes):
): ):
return return
hw_action = None hw_action: Optional[HardwareAction] = None
device = None device: Optional[Device] = None
## ##
# Remove # Remove
@ -121,65 +121,15 @@ class HwMonitor(CoreSysAttributes):
if kernel.action == UdevKernelAction.ADD: if kernel.action == UdevKernelAction.ADD:
hw_action = HardwareAction.ADD hw_action = HardwareAction.ADD
# Process Action # Ignore event for future processing
if ( if device is None or hw_action is None:
device
and hw_action
and self.sys_core.state in (CoreState.RUNNING, CoreState.FREEZE)
):
# New Sound device
if device.subsystem == UdevSubsystem.AUDIO:
await self._action_sound(device, hw_action)
# serial device
elif device.subsystem == UdevSubsystem.SERIAL:
await self._action_tty(device, hw_action)
# input device
elif device.subsystem == UdevSubsystem.INPUT:
await self._action_input(device, hw_action)
# USB device
elif device.subsystem == UdevSubsystem.USB:
await self._action_usb(device, hw_action)
# GPIO device
elif device.subsystem == UdevSubsystem.GPIO:
await self._action_gpio(device, hw_action)
async def _action_sound(self, device: Device, action: HardwareAction):
"""Process sound actions."""
if not self.sys_hardware.policy.is_match_cgroup(PolicyGroup.AUDIO, device):
return
_LOGGER.info("Detecting %s audio hardware - %s", action, device.path)
await self.sys_create_task(self.sys_host.sound.update())
async def _action_tty(self, device: Device, action: HardwareAction):
"""Process tty actions."""
if not device.by_id or not self.sys_hardware.policy.is_match_cgroup(
PolicyGroup.UART, device
):
return return
_LOGGER.info( _LOGGER.info(
"Detecting %s serial hardware %s - %s", action, device.path, device.by_id "Detecting %s hardware %s - %s", hw_action, device.path, device.by_id
) )
async def _action_input(self, device: Device, action: HardwareAction): # Fire Hardware event to bus
"""Process input actions.""" if hw_action == HardwareAction.ADD:
if not device.by_id: self.sys_bus.fire_event(BusEvent.HARDWARE_NEW_DEVICE, device)
return elif hw_action == HardwareAction.REMOVE:
_LOGGER.info( self.sys_bus.fire_event(BusEvent.HARDWARE_REMOVE_DEVICE, device)
"Detecting %s serial hardware %s - %s", action, device.path, device.by_id
)
async def _action_usb(self, device: Device, action: HardwareAction):
"""Process usb actions."""
if not self.sys_hardware.policy.is_match_cgroup(PolicyGroup.USB, device):
return
_LOGGER.info("Detecting %s usb hardware %s", action, device.path)
async def _action_gpio(self, device: Device, action: HardwareAction):
"""Process gpio actions."""
if not self.sys_hardware.policy.is_match_cgroup(PolicyGroup.GPIO, device):
return
_LOGGER.info("Detecting %s GPIO hardware %s", action, device.path)

View File

@ -1,124 +1 @@
"""Host function like audio, D-Bus or systemd.""" """Host backend."""
from contextlib import suppress
from functools import lru_cache
import logging
from typing import List
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import HassioError, PulseAudioError
from .apparmor import AppArmorControl
from .const import HostFeature
from .control import SystemControl
from .info import InfoCenter
from .network import NetworkManager
from .services import ServiceManager
from .sound import SoundControl
_LOGGER: logging.Logger = logging.getLogger(__name__)
class HostManager(CoreSysAttributes):
"""Manage supported function from host."""
def __init__(self, coresys: CoreSys):
"""Initialize Host manager."""
self.coresys: CoreSys = coresys
self._apparmor: AppArmorControl = AppArmorControl(coresys)
self._control: SystemControl = SystemControl(coresys)
self._info: InfoCenter = InfoCenter(coresys)
self._services: ServiceManager = ServiceManager(coresys)
self._network: NetworkManager = NetworkManager(coresys)
self._sound: SoundControl = SoundControl(coresys)
@property
def apparmor(self) -> AppArmorControl:
"""Return host AppArmor handler."""
return self._apparmor
@property
def control(self) -> SystemControl:
"""Return host control handler."""
return self._control
@property
def info(self) -> InfoCenter:
"""Return host info handler."""
return self._info
@property
def services(self) -> ServiceManager:
"""Return host services handler."""
return self._services
@property
def network(self) -> NetworkManager:
"""Return host NetworkManager handler."""
return self._network
@property
def sound(self) -> SoundControl:
"""Return host PulseAudio control."""
return self._sound
@property
def features(self) -> List[HostFeature]:
"""Return a list of host features."""
return self.supported_features()
@lru_cache
def supported_features(self) -> List[HostFeature]:
"""Return a list of supported host features."""
features = []
if self.sys_dbus.systemd.is_connected:
features.extend(
[HostFeature.REBOOT, HostFeature.SHUTDOWN, HostFeature.SERVICES]
)
if self.sys_dbus.network.is_connected and self.sys_dbus.network.interfaces:
features.append(HostFeature.NETWORK)
if self.sys_dbus.hostname.is_connected:
features.append(HostFeature.HOSTNAME)
if self.sys_dbus.timedate.is_connected:
features.append(HostFeature.TIMEDATE)
if self.sys_dbus.agent.is_connected:
features.append(HostFeature.AGENT)
if self.sys_hassos.available:
features.append(HostFeature.HAOS)
return features
async def reload(self):
"""Reload host functions."""
await self.info.update()
if self.sys_dbus.systemd.is_connected:
await self.services.update()
if self.sys_dbus.network.is_connected:
await self.network.update()
if self.sys_dbus.agent.is_connected:
await self.sys_dbus.agent.update()
with suppress(PulseAudioError):
await self.sound.update()
_LOGGER.info("Host information reload completed")
self.supported_features.cache_clear() # pylint: disable=no-member
async def load(self):
"""Load host information."""
with suppress(HassioError):
await self.reload()
# Load profile data
try:
await self.apparmor.load()
except HassioError as err:
_LOGGER.warning("Loading host AppArmor on start failed: %s", err)

138
supervisor/host/manager.py Normal file
View File

@ -0,0 +1,138 @@
"""Host function like audio, D-Bus or systemd."""
from contextlib import suppress
from functools import lru_cache
import logging
from typing import List
from ..const import BusEvent
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import HassioError, PulseAudioError
from ..hardware.const import PolicyGroup
from ..hardware.data import Device
from .apparmor import AppArmorControl
from .const import HostFeature
from .control import SystemControl
from .info import InfoCenter
from .network import NetworkManager
from .services import ServiceManager
from .sound import SoundControl
_LOGGER: logging.Logger = logging.getLogger(__name__)
class HostManager(CoreSysAttributes):
"""Manage supported function from host."""
def __init__(self, coresys: CoreSys):
"""Initialize Host manager."""
self.coresys: CoreSys = coresys
self._apparmor: AppArmorControl = AppArmorControl(coresys)
self._control: SystemControl = SystemControl(coresys)
self._info: InfoCenter = InfoCenter(coresys)
self._services: ServiceManager = ServiceManager(coresys)
self._network: NetworkManager = NetworkManager(coresys)
self._sound: SoundControl = SoundControl(coresys)
@property
def apparmor(self) -> AppArmorControl:
"""Return host AppArmor handler."""
return self._apparmor
@property
def control(self) -> SystemControl:
"""Return host control handler."""
return self._control
@property
def info(self) -> InfoCenter:
"""Return host info handler."""
return self._info
@property
def services(self) -> ServiceManager:
"""Return host services handler."""
return self._services
@property
def network(self) -> NetworkManager:
"""Return host NetworkManager handler."""
return self._network
@property
def sound(self) -> SoundControl:
"""Return host PulseAudio control."""
return self._sound
@property
def features(self) -> List[HostFeature]:
"""Return a list of host features."""
return self.supported_features()
@lru_cache
def supported_features(self) -> List[HostFeature]:
"""Return a list of supported host features."""
features = []
if self.sys_dbus.systemd.is_connected:
features.extend(
[HostFeature.REBOOT, HostFeature.SHUTDOWN, HostFeature.SERVICES]
)
if self.sys_dbus.network.is_connected and self.sys_dbus.network.interfaces:
features.append(HostFeature.NETWORK)
if self.sys_dbus.hostname.is_connected:
features.append(HostFeature.HOSTNAME)
if self.sys_dbus.timedate.is_connected:
features.append(HostFeature.TIMEDATE)
if self.sys_dbus.agent.is_connected:
features.append(HostFeature.AGENT)
if self.sys_hassos.available:
features.append(HostFeature.HAOS)
return features
async def reload(self):
"""Reload host functions."""
await self.info.update()
if self.sys_dbus.systemd.is_connected:
await self.services.update()
if self.sys_dbus.network.is_connected:
await self.network.update()
if self.sys_dbus.agent.is_connected:
await self.sys_dbus.agent.update()
with suppress(PulseAudioError):
await self.sound.update()
_LOGGER.info("Host information reload completed")
self.supported_features.cache_clear() # pylint: disable=no-member
async def load(self):
"""Load host information."""
with suppress(HassioError):
await self.reload()
# Register for events
self.sys_bus.register_event(BusEvent.HARDWARE_NEW_DEVICE, self._hardware_events)
self.sys_bus.register_event(
BusEvent.HARDWARE_REMOVE_DEVICE, self._hardware_events
)
# Load profile data
try:
await self.apparmor.load()
except HassioError as err:
_LOGGER.warning("Loading host AppArmor on start failed: %s", err)
async def _hardware_events(self, device: Device) -> None:
"""Process hardware requests."""
if self.sys_hardware.policy.is_match_cgroup(PolicyGroup.AUDIO, device):
await self.sound.update()

44
tests/test_bus.py Normal file
View File

@ -0,0 +1,44 @@
"""Test bus backend."""
import asyncio
import pytest
from supervisor.const import BusEvent
from supervisor.coresys import CoreSys
@pytest.mark.asyncio
async def test_bus_event(coresys: CoreSys) -> None:
"""Test bus events over the backend."""
results = []
async def callback(data) -> None:
"""Test callback."""
results.append(data)
coresys.bus.register_event(BusEvent.HARDWARE_NEW_DEVICE, callback)
coresys.bus.fire_event(BusEvent.HARDWARE_NEW_DEVICE, None)
await asyncio.sleep(0)
assert results[-1] is None
coresys.bus.fire_event(BusEvent.HARDWARE_NEW_DEVICE, "test")
await asyncio.sleep(0)
assert results[-1] == "test"
@pytest.mark.asyncio
async def test_bus_event_not_called(coresys: CoreSys) -> None:
"""Test bus events over the backend."""
results = []
async def callback(data) -> None:
"""Test callback."""
results.append(data)
coresys.bus.register_event(BusEvent.HARDWARE_NEW_DEVICE, callback)
coresys.bus.fire_event(BusEvent.HARDWARE_REMOVE_DEVICE, None)
await asyncio.sleep(0)
assert len(results) == 0