Use class to manage subscriptions

This commit is contained in:
abmantis 2025-07-03 19:19:49 +01:00
parent 89a9ab699d
commit 5d553e5641

View File

@ -852,6 +852,91 @@ def async_extract_referenced_entity_ids(
return selected
class TargetSelectorStateChangeTracker:
"""Helper class to manage state change tracking for target selectors."""
def __init__(
self,
hass: HomeAssistant,
selector_data: TargetSelectorData,
job_type: HassJobType | None,
action: Callable[[Event[EventStateChangedData]], Any],
) -> None:
"""Initialize the state change tracker."""
self._hass = hass
self._selector_data = selector_data
self._job_type = job_type
self._action = action
self._state_change_unsub: CALLBACK_TYPE | None = None
self._registry_unsubs: list[CALLBACK_TYPE] = []
self._setup_tracking()
self._setup_registry_listeners()
def _track_entities_state_change(self) -> CALLBACK_TYPE:
"""Set up state change tracking for currently selected entities."""
selected = async_extract_referenced_entity_ids(
self._hass, self._selector_data, expand_group=False
)
@callback
def state_change_listener(event: Event[EventStateChangedData]) -> None:
"""Handle state change events."""
if (
event.data["entity_id"] in selected.referenced
or event.data["entity_id"] in selected.indirectly_referenced
):
self._action(event)
tracked_entities = selected.referenced.union(selected.indirectly_referenced)
_LOGGER.debug("Tracking state changes for entities: %s", tracked_entities)
return async_track_state_change_event(
self._hass, tracked_entities, state_change_listener, job_type=self._job_type
)
def _setup_tracking(self) -> None:
"""Initialize state change tracking."""
self._state_change_unsub = self._track_entities_state_change()
def _setup_registry_listeners(self) -> None:
"""Set up listeners for registry changes that require resubscription."""
@callback
def resubscribe_state_change_event(event: Event[Any] | None = None) -> None:
"""Resubscribe to state change events when registry changes."""
if self._state_change_unsub:
self._state_change_unsub()
self._state_change_unsub = self._track_entities_state_change()
self._registry_unsubs = [
self._hass.bus.async_listen(
entity_registry.EVENT_ENTITY_REGISTRY_UPDATED,
resubscribe_state_change_event,
# TODO(abmantis): filter for entities that match the target selector?
# event_filter=self._filter_entity_registry_changes,
),
self._hass.bus.async_listen(
device_registry.EVENT_DEVICE_REGISTRY_UPDATED,
resubscribe_state_change_event,
),
self._hass.bus.async_listen(
area_registry.EVENT_AREA_REGISTRY_UPDATED,
resubscribe_state_change_event,
),
]
def unsub(self) -> None:
"""Unsubscribe from all events."""
for registry_unsub in self._registry_unsubs:
registry_unsub()
self._registry_unsubs.clear()
if self._state_change_unsub:
self._state_change_unsub()
self._state_change_unsub = None
def async_track_target_selector_state_change_event(
hass: HomeAssistant,
target_selector_config: ConfigType,
@ -867,57 +952,5 @@ def async_track_target_selector_state_change_event(
)
return lambda: None
def track_entities_state_change() -> CALLBACK_TYPE:
selected = async_extract_referenced_entity_ids(
hass, selector_data, expand_group=False
)
@callback
def state_change_listener(event: Event[EventStateChangedData]) -> None:
"""Handle state change events."""
if (
event.data["entity_id"] in selected.referenced
or event.data["entity_id"] in selected.indirectly_referenced
):
action(event)
tracked_entities = selected.referenced.union(selected.indirectly_referenced)
_LOGGER.debug("Tracking state changes for entities: %s", tracked_entities)
return async_track_state_change_event(
hass, tracked_entities, state_change_listener, job_type=job_type
)
unsub_state_change = track_entities_state_change()
def resubscribe_state_change_event(event: Event[Any] | None = None) -> None:
# TODO(abmantis): Check if there is a better way to do this
nonlocal unsub_state_change
unsub_state_change()
unsub_state_change = track_entities_state_change()
unsub_registry_updates = [
hass.bus.async_listen(
entity_registry.EVENT_ENTITY_REGISTRY_UPDATED,
resubscribe_state_change_event,
# TODO(abmantis): filter for entities that match the target selector?
# event_filter=self._filter_entity_registry_changes,
),
hass.bus.async_listen(
device_registry.EVENT_DEVICE_REGISTRY_UPDATED,
resubscribe_state_change_event,
),
hass.bus.async_listen(
area_registry.EVENT_AREA_REGISTRY_UPDATED,
resubscribe_state_change_event,
),
]
def unsub() -> None:
"""Unsubscribe from state change and registry update events."""
for registry_unsub in unsub_registry_updates:
registry_unsub()
unsub_registry_updates.clear()
unsub_state_change()
return unsub
tracker = TargetSelectorStateChangeTracker(hass, selector_data, job_type, action)
return tracker.unsub