Files
supervisor/supervisor/bus.py
Mike Degatano 30cc172199 Migrate images from dockerpy to aiodocker (#6252)
* Migrate images from dockerpy to aiodocker

* Add missing coverage and fix bug in repair

* Bind libraries to different files and refactor images.pull

* Use the same socket again

Try using the same socket again.

* Fix pytest

---------

Co-authored-by: Stefan Agner <stefan@agner.ch>
2025-11-12 20:54:06 +01:00

56 lines
1.7 KiB
Python

"""Bus event system."""
from __future__ import annotations
from asyncio import Task
from collections.abc import Callable, Coroutine
import logging
from typing import Any
import attr
from .const import BusEvent
from .coresys import CoreSys, CoreSysAttributes
_LOGGER: logging.Logger = logging.getLogger(__name__)
@attr.s(slots=True, frozen=True)
class EventListener:
"""Event listener."""
event_type: BusEvent = attr.ib()
callback: Callable[[Any], Coroutine[Any, Any, None]] = attr.ib()
class Bus(CoreSysAttributes):
"""Handle Bus event system."""
def __init__(self, coresys: CoreSys):
"""Initialize bus backend."""
self.coresys = coresys
self._listeners: dict[BusEvent, list[EventListener]] = {}
def register_event(
self, event: BusEvent, callback: Callable[[Any], Coroutine[Any, Any, None]]
) -> EventListener:
"""Register callback for an event."""
listener = EventListener(event, callback)
self._listeners.setdefault(event, []).append(listener)
return listener
def fire_event(self, event: BusEvent, reference: Any) -> list[Task]:
"""Fire an event to the bus."""
_LOGGER.debug("Fire event '%s' with '%s'", event, reference)
tasks: list[Task] = []
for listener in self._listeners.get(event, []):
tasks.append(self.sys_create_task(listener.callback(reference)))
return tasks
def remove_listener(self, listener: EventListener) -> None:
"""Unregister an listener."""
try:
self._listeners[listener.event_type].remove(listener)
except (ValueError, KeyError):
_LOGGER.warning("Listener %s not registered", listener)