Simplify Overkiz coordinator (step 2) and address feedback (#64028)

This commit is contained in:
Mick Vleeshouwer 2022-01-13 04:45:39 -08:00 committed by GitHub
parent 754e291e1e
commit 105a7e5109
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -13,16 +13,19 @@ from pyoverkiz.exceptions import (
NotAuthenticatedException,
TooManyRequestsException,
)
from pyoverkiz.models import Device, Place
from pyoverkiz.models import Device, Event, Place
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util.decorator import Registry
from .const import DOMAIN, UPDATE_INTERVAL
_LOGGER = logging.getLogger(__name__)
EVENT_HANDLERS = Registry()
class OverkizDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Device]]):
"""Class to manage fetching data from Overkiz platform."""
@ -56,8 +59,8 @@ class OverkizDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Device]]):
for device in devices
)
self.executions: dict[str, dict[str, str]] = {}
self.areas = self.places_to_area(places)
self._config_entry_id = config_entry_id
self.areas = self._places_to_area(places)
self.config_entry_id = config_entry_id
async def _async_update_data(self) -> dict[str, Device]:
"""Fetch Overkiz data via event listener."""
@ -88,57 +91,8 @@ class OverkizDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Device]]):
for event in events:
_LOGGER.debug(event)
if event.name == EventName.DEVICE_AVAILABLE:
self.devices[event.device_url].available = True
elif event.name in [
EventName.DEVICE_UNAVAILABLE,
EventName.DEVICE_DISABLED,
]:
self.devices[event.device_url].available = False
elif event.name in [
EventName.DEVICE_CREATED,
EventName.DEVICE_UPDATED,
]:
self.hass.async_create_task(
self.hass.config_entries.async_reload(self._config_entry_id)
)
elif event.name == EventName.DEVICE_REMOVED:
base_device_url, *_ = event.device_url.split("#")
registry = dr.async_get(self.hass)
if registered_device := registry.async_get_device(
{(DOMAIN, base_device_url)}
):
registry.async_remove_device(registered_device.id)
del self.devices[event.device_url]
elif event.name == EventName.DEVICE_STATE_CHANGED:
for state in event.device_states:
device = self.devices[event.device_url]
if (device_state := device.states[state.name]) is None:
device_state = state
device.states[state.name] = device_state
device_state.value = state.value
elif event.name == EventName.EXECUTION_REGISTERED:
if event.exec_id not in self.executions:
self.executions[event.exec_id] = {}
if not self.is_stateless:
self.update_interval = timedelta(seconds=1)
elif (
event.name == EventName.EXECUTION_STATE_CHANGED
and event.exec_id in self.executions
and event.new_state in [ExecutionState.COMPLETED, ExecutionState.FAILED]
):
del self.executions[event.exec_id]
if event_handler := EVENT_HANDLERS.get(event.name):
await event_handler(self, event)
if not self.executions:
self.update_interval = UPDATE_INTERVAL
@ -150,14 +104,99 @@ class OverkizDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Device]]):
_LOGGER.debug("Fetching all devices and state via /setup/devices")
return {d.device_url: d for d in await self.client.get_devices(refresh=True)}
def places_to_area(self, place: Place) -> dict[str, str]:
"""Convert places with sub_places to a flat dictionary."""
def _places_to_area(self, place: Place) -> dict[str, str]:
"""Convert places with sub_places to a flat dictionary [placeoid, label])."""
areas = {}
if isinstance(place, Place):
areas[place.oid] = place.label
if isinstance(place.sub_places, list):
for sub_place in place.sub_places:
areas.update(self.places_to_area(sub_place))
areas.update(self._places_to_area(sub_place))
return areas
@EVENT_HANDLERS.register(EventName.DEVICE_AVAILABLE)
async def on_device_available(
coordinator: OverkizDataUpdateCoordinator, event: Event
) -> None:
"""Handle device available event."""
if event.device_url:
coordinator.devices[event.device_url].available = True
@EVENT_HANDLERS.register(EventName.DEVICE_UNAVAILABLE)
@EVENT_HANDLERS.register(EventName.DEVICE_DISABLED)
async def on_device_unavailable_disabled(
coordinator: OverkizDataUpdateCoordinator, event: Event
) -> None:
"""Handle device unavailable / disabled event."""
if event.device_url:
coordinator.devices[event.device_url].available = False
@EVENT_HANDLERS.register(EventName.DEVICE_CREATED)
@EVENT_HANDLERS.register(EventName.DEVICE_UPDATED)
async def on_device_created_updated(
coordinator: OverkizDataUpdateCoordinator, event: Event
) -> None:
"""Handle device unavailable / disabled event."""
coordinator.hass.async_create_task(
coordinator.hass.config_entries.async_reload(coordinator.config_entry_id)
)
@EVENT_HANDLERS.register(EventName.DEVICE_STATE_CHANGED)
async def on_device_state_changed(
coordinator: OverkizDataUpdateCoordinator, event: Event
) -> None:
"""Handle device state changed event."""
if not event.device_url:
return
for state in event.device_states:
device = coordinator.devices[event.device_url]
device.states[state.name] = state
@EVENT_HANDLERS.register(EventName.DEVICE_REMOVED)
async def on_device_removed(
coordinator: OverkizDataUpdateCoordinator, event: Event
) -> None:
"""Handle device removed event."""
if not event.device_url:
return
base_device_url = event.device_url.split("#")[0]
registry = dr.async_get(coordinator.hass)
if registered_device := registry.async_get_device({(DOMAIN, base_device_url)}):
registry.async_remove_device(registered_device.id)
if event.device_url:
del coordinator.devices[event.device_url]
@EVENT_HANDLERS.register(EventName.EXECUTION_REGISTERED)
async def on_execution_registered(
coordinator: OverkizDataUpdateCoordinator, event: Event
) -> None:
"""Handle execution registered event."""
if event.exec_id and event.exec_id not in coordinator.executions:
coordinator.executions[event.exec_id] = {}
if not coordinator.is_stateless:
coordinator.update_interval = timedelta(seconds=1)
@EVENT_HANDLERS.register(EventName.EXECUTION_STATE_CHANGED)
async def on_execution_state_changed(
coordinator: OverkizDataUpdateCoordinator, event: Event
) -> None:
"""Handle execution changed event."""
if event.exec_id in coordinator.executions and event.new_state in [
ExecutionState.COMPLETED,
ExecutionState.FAILED,
]:
del coordinator.executions[event.exec_id]