Implement update coordinator in Proximity (#103443)

This commit is contained in:
Michael 2023-11-12 18:28:39 +01:00 committed by GitHub
parent 50e11a7a37
commit b2f31d5763
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 330 additions and 264 deletions

View File

@ -5,46 +5,26 @@ import logging
import voluptuous as vol
from homeassistant.const import (
ATTR_LATITUDE,
ATTR_LONGITUDE,
CONF_DEVICES,
CONF_UNIT_OF_MEASUREMENT,
CONF_ZONE,
UnitOfLength,
)
from homeassistant.core import HomeAssistant, State, callback
from homeassistant.const import CONF_DEVICES, CONF_UNIT_OF_MEASUREMENT, CONF_ZONE
from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import async_track_state_change
from homeassistant.helpers.typing import ConfigType
from homeassistant.util.location import distance
from homeassistant.util.unit_conversion import DistanceConverter
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import (
ATTR_DIR_OF_TRAVEL,
ATTR_NEAREST,
CONF_IGNORED_ZONES,
CONF_TOLERANCE,
DEFAULT_PROXIMITY_ZONE,
DEFAULT_TOLERANCE,
DOMAIN,
UNITS,
)
from .coordinator import ProximityDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
ATTR_DIR_OF_TRAVEL = "dir_of_travel"
ATTR_DIST_FROM = "dist_to_zone"
ATTR_NEAREST = "nearest"
CONF_IGNORED_ZONES = "ignored_zones"
CONF_TOLERANCE = "tolerance"
DEFAULT_DIR_OF_TRAVEL = "not set"
DEFAULT_DIST_TO_ZONE = "not set"
DEFAULT_NEAREST = "not set"
DEFAULT_PROXIMITY_ZONE = "home"
DEFAULT_TOLERANCE = 1
DOMAIN = "proximity"
UNITS = [
UnitOfLength.METERS,
UnitOfLength.KILOMETERS,
UnitOfLength.FEET,
UnitOfLength.YARDS,
UnitOfLength.MILES,
]
ZONE_SCHEMA = vol.Schema(
{
vol.Optional(CONF_ZONE, default=DEFAULT_PROXIMITY_ZONE): cv.string,
@ -62,52 +42,22 @@ CONFIG_SCHEMA = vol.Schema(
)
@callback
def async_setup_proximity_component(
hass: HomeAssistant, name: str, config: ConfigType
) -> bool:
"""Set up the individual proximity component."""
ignored_zones: list[str] = config[CONF_IGNORED_ZONES]
proximity_devices: list[str] = config[CONF_DEVICES]
tolerance: int = config[CONF_TOLERANCE]
proximity_zone = config[CONF_ZONE]
unit_of_measurement: str = config.get(
CONF_UNIT_OF_MEASUREMENT, hass.config.units.length_unit
)
zone_friendly_name = name
proximity = Proximity(
hass,
zone_friendly_name,
DEFAULT_DIST_TO_ZONE,
DEFAULT_DIR_OF_TRAVEL,
DEFAULT_NEAREST,
ignored_zones,
proximity_devices,
tolerance,
proximity_zone,
unit_of_measurement,
)
proximity.entity_id = f"{DOMAIN}.{zone_friendly_name}"
proximity.async_write_ha_state()
async_track_state_change(
hass, proximity_devices, proximity.async_check_proximity_state_change
)
return True
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Get the zones and offsets from configuration.yaml."""
hass.data.setdefault(DOMAIN, {})
for zone, proximity_config in config[DOMAIN].items():
async_setup_proximity_component(hass, zone, proximity_config)
_LOGGER.debug("setup %s with config:%s", zone, proximity_config)
coordinator = ProximityDataUpdateCoordinator(hass, zone, proximity_config)
await coordinator.async_config_entry_first_refresh()
hass.data[DOMAIN][zone] = coordinator
proximity = Proximity(hass, zone, coordinator)
proximity.async_write_ha_state()
await proximity.async_added_to_hass()
return True
class Proximity(Entity):
class Proximity(CoordinatorEntity[ProximityDataUpdateCoordinator]):
"""Representation of a Proximity."""
# This entity is legacy and does not have a platform.
@ -117,203 +67,26 @@ class Proximity(Entity):
def __init__(
self,
hass: HomeAssistant,
zone_friendly_name: str,
dist_to: str,
dir_of_travel: str,
nearest: str,
ignored_zones: list[str],
proximity_devices: list[str],
tolerance: int,
proximity_zone: str,
unit_of_measurement: str,
friendly_name: str,
coordinator: ProximityDataUpdateCoordinator,
) -> None:
"""Initialize the proximity."""
super().__init__(coordinator)
self.hass = hass
self.friendly_name = zone_friendly_name
self.dist_to: str | int = dist_to
self.dir_of_travel = dir_of_travel
self.nearest = nearest
self.ignored_zones = ignored_zones
self.proximity_devices = proximity_devices
self.tolerance = tolerance
self.proximity_zone = proximity_zone
self._unit_of_measurement = unit_of_measurement
self.entity_id = f"{DOMAIN}.{friendly_name}"
self._attr_name = friendly_name
self._attr_unit_of_measurement = self.coordinator.unit_of_measurement
@property
def name(self) -> str:
"""Return the name of the entity."""
return self.friendly_name
@property
def state(self) -> str | int:
def state(self) -> str | int | float:
"""Return the state."""
return self.dist_to
@property
def unit_of_measurement(self) -> str:
"""Return the unit of measurement of this entity."""
return self._unit_of_measurement
return self.coordinator.data["dist_to_zone"]
@property
def extra_state_attributes(self) -> dict[str, str]:
"""Return the state attributes."""
return {ATTR_DIR_OF_TRAVEL: self.dir_of_travel, ATTR_NEAREST: self.nearest}
@callback
def async_check_proximity_state_change(
self, entity: str, old_state: State | None, new_state: State | None
) -> None:
"""Perform the proximity checking."""
if new_state is None:
return
entity_name = new_state.name
devices_to_calculate = False
devices_in_zone = ""
zone_state = self.hass.states.get(f"zone.{self.proximity_zone}")
proximity_latitude = (
zone_state.attributes.get(ATTR_LATITUDE) if zone_state else None
)
proximity_longitude = (
zone_state.attributes.get(ATTR_LONGITUDE) if zone_state else None
)
# Check for devices in the monitored zone.
for device in self.proximity_devices:
if (device_state := self.hass.states.get(device)) is None:
devices_to_calculate = True
continue
if device_state.state not in self.ignored_zones:
devices_to_calculate = True
# Check the location of all devices.
if (device_state.state).lower() == (self.proximity_zone).lower():
device_friendly = device_state.name
if devices_in_zone != "":
devices_in_zone = f"{devices_in_zone}, "
devices_in_zone = devices_in_zone + device_friendly
# No-one to track so reset the entity.
if not devices_to_calculate:
self.dist_to = "not set"
self.dir_of_travel = "not set"
self.nearest = "not set"
self.async_write_ha_state()
return
# At least one device is in the monitored zone so update the entity.
if devices_in_zone != "":
self.dist_to = 0
self.dir_of_travel = "arrived"
self.nearest = devices_in_zone
self.async_write_ha_state()
return
# We can't check proximity because latitude and longitude don't exist.
if "latitude" not in new_state.attributes:
return
# Collect distances to the zone for all devices.
distances_to_zone: dict[str, float] = {}
for device in self.proximity_devices:
# Ignore devices in an ignored zone.
device_state = self.hass.states.get(device)
if not device_state or device_state.state in self.ignored_zones:
continue
# Ignore devices if proximity cannot be calculated.
if "latitude" not in device_state.attributes:
continue
# Calculate the distance to the proximity zone.
proximity = distance(
proximity_latitude,
proximity_longitude,
device_state.attributes[ATTR_LATITUDE],
device_state.attributes[ATTR_LONGITUDE],
)
# Add the device and distance to a dictionary.
if not proximity:
continue
distances_to_zone[device] = round(
DistanceConverter.convert(
proximity, UnitOfLength.METERS, self.unit_of_measurement
),
1,
)
# Loop through each of the distances collected and work out the
# closest.
closest_device: str | None = None
dist_to_zone: float | None = None
for device, zone in distances_to_zone.items():
if not dist_to_zone or zone < dist_to_zone:
closest_device = device
dist_to_zone = zone
# If the closest device is one of the other devices.
if closest_device is not None and closest_device != entity:
self.dist_to = round(distances_to_zone[closest_device])
self.dir_of_travel = "unknown"
device_state = self.hass.states.get(closest_device)
assert device_state
self.nearest = device_state.name
self.async_write_ha_state()
return
# Stop if we cannot calculate the direction of travel (i.e. we don't
# have a previous state and a current LAT and LONG).
if old_state is None or "latitude" not in old_state.attributes:
self.dist_to = round(distances_to_zone[entity])
self.dir_of_travel = "unknown"
self.nearest = entity_name
self.async_write_ha_state()
return
# Reset the variables
distance_travelled: float = 0
# Calculate the distance travelled.
old_distance = distance(
proximity_latitude,
proximity_longitude,
old_state.attributes[ATTR_LATITUDE],
old_state.attributes[ATTR_LONGITUDE],
)
new_distance = distance(
proximity_latitude,
proximity_longitude,
new_state.attributes[ATTR_LATITUDE],
new_state.attributes[ATTR_LONGITUDE],
)
assert new_distance is not None and old_distance is not None
distance_travelled = round(new_distance - old_distance, 1)
# Check for tolerance
if distance_travelled < self.tolerance * -1:
direction_of_travel = "towards"
elif distance_travelled > self.tolerance:
direction_of_travel = "away_from"
else:
direction_of_travel = "stationary"
# Update the proximity entity
self.dist_to = (
round(dist_to_zone) if dist_to_zone is not None else DEFAULT_DIST_TO_ZONE
)
self.dir_of_travel = direction_of_travel
self.nearest = entity_name
self.async_write_ha_state()
_LOGGER.debug(
"proximity.%s update entity: distance=%s: direction=%s: device=%s",
self.friendly_name,
self.dist_to,
direction_of_travel,
entity_name,
)
_LOGGER.info("%s: proximity calculation complete", entity_name)
return {
ATTR_DIR_OF_TRAVEL: str(self.coordinator.data["dir_of_travel"]),
ATTR_NEAREST: str(self.coordinator.data["nearest"]),
}

View File

@ -0,0 +1,25 @@
"""Constants for Proximity integration."""
from homeassistant.const import UnitOfLength
ATTR_DIR_OF_TRAVEL = "dir_of_travel"
ATTR_DIST_TO = "dist_to_zone"
ATTR_NEAREST = "nearest"
CONF_IGNORED_ZONES = "ignored_zones"
CONF_TOLERANCE = "tolerance"
DEFAULT_DIR_OF_TRAVEL = "not set"
DEFAULT_DIST_TO_ZONE = "not set"
DEFAULT_NEAREST = "not set"
DEFAULT_PROXIMITY_ZONE = "home"
DEFAULT_TOLERANCE = 1
DOMAIN = "proximity"
UNITS = [
UnitOfLength.METERS,
UnitOfLength.KILOMETERS,
UnitOfLength.FEET,
UnitOfLength.YARDS,
UnitOfLength.MILES,
]

View File

@ -0,0 +1,268 @@
"""Data update coordinator for the Proximity integration."""
from dataclasses import dataclass
import logging
from typing import TypedDict
from homeassistant.const import (
ATTR_LATITUDE,
ATTR_LONGITUDE,
CONF_DEVICES,
CONF_UNIT_OF_MEASUREMENT,
CONF_ZONE,
UnitOfLength,
)
from homeassistant.core import HomeAssistant, State
from homeassistant.helpers.event import async_track_state_change
from homeassistant.helpers.typing import ConfigType
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from homeassistant.util.location import distance
from homeassistant.util.unit_conversion import DistanceConverter
from .const import (
CONF_IGNORED_ZONES,
CONF_TOLERANCE,
DEFAULT_DIR_OF_TRAVEL,
DEFAULT_DIST_TO_ZONE,
DEFAULT_NEAREST,
)
_LOGGER = logging.getLogger(__name__)
@dataclass
class StateChangedData:
"""StateChangedData class."""
entity_id: str
old_state: State | None
new_state: State | None
class ProximityData(TypedDict):
"""ProximityData type class."""
dist_to_zone: str | float
dir_of_travel: str | float
nearest: str | float
class ProximityDataUpdateCoordinator(DataUpdateCoordinator[ProximityData]):
"""Proximity data update coordinator."""
def __init__(
self, hass: HomeAssistant, friendly_name: str, config: ConfigType
) -> None:
"""Initialize the Proximity coordinator."""
self.ignored_zones: list[str] = config[CONF_IGNORED_ZONES]
self.proximity_devices: list[str] = config[CONF_DEVICES]
self.tolerance: int = config[CONF_TOLERANCE]
self.proximity_zone: str = config[CONF_ZONE]
self.unit_of_measurement: str = config.get(
CONF_UNIT_OF_MEASUREMENT, hass.config.units.length_unit
)
self.friendly_name = friendly_name
super().__init__(
hass,
_LOGGER,
name=friendly_name,
update_interval=None,
)
self.data = {
"dist_to_zone": DEFAULT_DIST_TO_ZONE,
"dir_of_travel": DEFAULT_DIR_OF_TRAVEL,
"nearest": DEFAULT_NEAREST,
}
self.state_change_data: StateChangedData | None = None
async_track_state_change(
hass, self.proximity_devices, self.async_check_proximity_state_change
)
async def async_check_proximity_state_change(
self, entity: str, old_state: State | None, new_state: State | None
) -> None:
"""Fetch and process state change event."""
if new_state is None:
_LOGGER.debug("no new_state -> abort")
return
# We can't check proximity because latitude and longitude don't exist.
if "latitude" not in new_state.attributes:
_LOGGER.debug("no latitude and longitude -> abort")
return
self.state_change_data = StateChangedData(entity, old_state, new_state)
await self.async_refresh()
async def _async_update_data(self) -> ProximityData:
"""Calculate Proximity data."""
if self.state_change_data is None or self.state_change_data.new_state is None:
return self.data
entity_name = self.state_change_data.new_state.name
devices_to_calculate = False
devices_in_zone = ""
zone_state = self.hass.states.get(f"zone.{self.proximity_zone}")
proximity_latitude = (
zone_state.attributes.get(ATTR_LATITUDE) if zone_state else None
)
proximity_longitude = (
zone_state.attributes.get(ATTR_LONGITUDE) if zone_state else None
)
# Check for devices in the monitored zone.
for device in self.proximity_devices:
if (device_state := self.hass.states.get(device)) is None:
devices_to_calculate = True
continue
if device_state.state not in self.ignored_zones:
devices_to_calculate = True
# Check the location of all devices.
if (device_state.state).lower() == (self.proximity_zone).lower():
device_friendly = device_state.name
if devices_in_zone != "":
devices_in_zone = f"{devices_in_zone}, "
devices_in_zone = devices_in_zone + device_friendly
# No-one to track so reset the entity.
if not devices_to_calculate:
_LOGGER.debug("no devices_to_calculate -> abort")
return {
"dist_to_zone": DEFAULT_DIST_TO_ZONE,
"dir_of_travel": DEFAULT_DIR_OF_TRAVEL,
"nearest": DEFAULT_NEAREST,
}
# At least one device is in the monitored zone so update the entity.
if devices_in_zone != "":
_LOGGER.debug("at least on device is in zone -> arrived")
return {
"dist_to_zone": 0,
"dir_of_travel": "arrived",
"nearest": devices_in_zone,
}
# Collect distances to the zone for all devices.
distances_to_zone: dict[str, float] = {}
for device in self.proximity_devices:
# Ignore devices in an ignored zone.
device_state = self.hass.states.get(device)
if not device_state or device_state.state in self.ignored_zones:
continue
# Ignore devices if proximity cannot be calculated.
if "latitude" not in device_state.attributes:
continue
# Calculate the distance to the proximity zone.
proximity = distance(
proximity_latitude,
proximity_longitude,
device_state.attributes[ATTR_LATITUDE],
device_state.attributes[ATTR_LONGITUDE],
)
# Add the device and distance to a dictionary.
if not proximity:
continue
distances_to_zone[device] = round(
DistanceConverter.convert(
proximity, UnitOfLength.METERS, self.unit_of_measurement
),
1,
)
# Loop through each of the distances collected and work out the
# closest.
closest_device: str | None = None
dist_to_zone: float | None = None
for device, zone in distances_to_zone.items():
if not dist_to_zone or zone < dist_to_zone:
closest_device = device
dist_to_zone = zone
# If the closest device is one of the other devices.
if (
closest_device is not None
and closest_device != self.state_change_data.entity_id
):
_LOGGER.debug("closest device is one of the other devices -> unknown")
device_state = self.hass.states.get(closest_device)
assert device_state
return {
"dist_to_zone": round(distances_to_zone[closest_device]),
"dir_of_travel": "unknown",
"nearest": device_state.name,
}
# Stop if we cannot calculate the direction of travel (i.e. we don't
# have a previous state and a current LAT and LONG).
if (
self.state_change_data.old_state is None
or "latitude" not in self.state_change_data.old_state.attributes
):
_LOGGER.debug("no lat and lon in old_state -> unknown")
return {
"dist_to_zone": round(
distances_to_zone[self.state_change_data.entity_id]
),
"dir_of_travel": "unknown",
"nearest": entity_name,
}
# Reset the variables
distance_travelled: float = 0
# Calculate the distance travelled.
old_distance = distance(
proximity_latitude,
proximity_longitude,
self.state_change_data.old_state.attributes[ATTR_LATITUDE],
self.state_change_data.old_state.attributes[ATTR_LONGITUDE],
)
new_distance = distance(
proximity_latitude,
proximity_longitude,
self.state_change_data.new_state.attributes[ATTR_LATITUDE],
self.state_change_data.new_state.attributes[ATTR_LONGITUDE],
)
assert new_distance is not None and old_distance is not None
distance_travelled = round(new_distance - old_distance, 1)
# Check for tolerance
if distance_travelled < self.tolerance * -1:
direction_of_travel = "towards"
elif distance_travelled > self.tolerance:
direction_of_travel = "away_from"
else:
direction_of_travel = "stationary"
# Update the proximity entity
dist_to: float | str
if dist_to_zone is not None:
dist_to = round(dist_to_zone)
else:
dist_to = DEFAULT_DIST_TO_ZONE
_LOGGER.debug(
"proximity.%s update entity: distance=%s: direction=%s: device=%s",
self.friendly_name,
dist_to,
direction_of_travel,
entity_name,
)
_LOGGER.info("%s: proximity calculation complete", entity_name)
return {
"dist_to_zone": dist_to,
"dir_of_travel": direction_of_travel,
"nearest": entity_name,
}