mirror of
https://github.com/home-assistant/core.git
synced 2025-04-23 00:37:53 +00:00
Add helpers.location.coordinates (#37234)
This commit is contained in:
parent
3eb6a68d12
commit
eb66da6436
@ -1,11 +1,18 @@
|
||||
"""Location helpers for Home Assistant."""
|
||||
|
||||
import logging
|
||||
from typing import Optional, Sequence
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE
|
||||
from homeassistant.core import State
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.typing import HomeAssistantType
|
||||
from homeassistant.util import location as loc_util
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def has_location(state: State) -> bool:
|
||||
"""Test if state contains a valid location.
|
||||
@ -41,3 +48,61 @@ def closest(
|
||||
longitude,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def find_coordinates(
|
||||
hass: HomeAssistantType, entity_id: str, recursion_history: Optional[list] = None
|
||||
) -> Optional[str]:
|
||||
"""Find the gps coordinates of the entity in the form of '90.000,180.000'."""
|
||||
entity_state = hass.states.get(entity_id)
|
||||
|
||||
if entity_state is None:
|
||||
_LOGGER.error("Unable to find entity %s", entity_id)
|
||||
return None
|
||||
|
||||
# Check if the entity has location attributes
|
||||
if has_location(entity_state):
|
||||
return _get_location_from_attributes(entity_state)
|
||||
|
||||
# Check if device is in a zone
|
||||
zone_entity = hass.states.get(f"zone.{entity_state.state}")
|
||||
if has_location(zone_entity): # type: ignore
|
||||
_LOGGER.debug(
|
||||
"%s is in %s, getting zone location", entity_id, zone_entity.entity_id # type: ignore
|
||||
)
|
||||
return _get_location_from_attributes(zone_entity) # type: ignore
|
||||
|
||||
# Resolve nested entity
|
||||
if recursion_history is None:
|
||||
recursion_history = []
|
||||
recursion_history.append(entity_id)
|
||||
if entity_state.state in recursion_history:
|
||||
_LOGGER.error(
|
||||
"Circular reference detected while trying to find coordinates of an entity. The state of %s has already been checked",
|
||||
entity_state.state,
|
||||
)
|
||||
return None
|
||||
_LOGGER.debug("Getting nested entity for state: %s", entity_state.state)
|
||||
nested_entity = hass.states.get(entity_state.state)
|
||||
if nested_entity is not None:
|
||||
_LOGGER.debug("Resolving nested entity_id: %s", entity_state.state)
|
||||
return find_coordinates(hass, entity_state.state, recursion_history)
|
||||
|
||||
# Check if state is valid coordinate set
|
||||
try:
|
||||
cv.gps(entity_state.state.split(","))
|
||||
except vol.Invalid:
|
||||
_LOGGER.error(
|
||||
"Entity %s does not contain a location and does not point at an entity that does: %s",
|
||||
entity_id,
|
||||
entity_state.state,
|
||||
)
|
||||
return None
|
||||
else:
|
||||
return entity_state.state
|
||||
|
||||
|
||||
def _get_location_from_attributes(entity_state: State) -> str:
|
||||
"""Get the lat/long string from an entities attributes."""
|
||||
attr = entity_state.attributes
|
||||
return "{},{}".format(attr.get(ATTR_LATITUDE), attr.get(ATTR_LONGITUDE))
|
||||
|
@ -43,3 +43,64 @@ def test_closest_returns_closest():
|
||||
state2 = State("light.test", "on", {ATTR_LATITUDE: 125.45, ATTR_LONGITUDE: 125.45})
|
||||
|
||||
assert state == location.closest(123.45, 123.45, [state, state2])
|
||||
|
||||
|
||||
async def test_coordinates_function_as_attributes(hass):
|
||||
"""Test coordinates function."""
|
||||
hass.states.async_set(
|
||||
"test.object", "happy", {"latitude": 32.87336, "longitude": -117.22943}
|
||||
)
|
||||
assert location.find_coordinates(hass, "test.object") == "32.87336,-117.22943"
|
||||
|
||||
|
||||
async def test_coordinates_function_as_state(hass):
|
||||
"""Test coordinates function."""
|
||||
hass.states.async_set("test.object", "32.87336,-117.22943")
|
||||
assert location.find_coordinates(hass, "test.object") == "32.87336,-117.22943"
|
||||
|
||||
|
||||
async def test_coordinates_function_device_tracker_in_zone(hass):
|
||||
"""Test coordinates function."""
|
||||
hass.states.async_set(
|
||||
"zone.home", "zoning", {"latitude": 32.87336, "longitude": -117.22943},
|
||||
)
|
||||
hass.states.async_set("device_tracker.device", "home")
|
||||
assert (
|
||||
location.find_coordinates(hass, "device_tracker.device")
|
||||
== "32.87336,-117.22943"
|
||||
)
|
||||
|
||||
|
||||
async def test_coordinates_function_device_tracker_from_input_select(hass):
|
||||
"""Test coordinates function."""
|
||||
hass.states.async_set(
|
||||
"input_select.select",
|
||||
"device_tracker.device",
|
||||
{"options": "device_tracker.device"},
|
||||
)
|
||||
hass.states.async_set("device_tracker.device", "32.87336,-117.22943")
|
||||
assert (
|
||||
location.find_coordinates(hass, "input_select.select") == "32.87336,-117.22943"
|
||||
)
|
||||
|
||||
|
||||
def test_coordinates_function_returns_none_on_recursion(hass):
|
||||
"""Test coordinates function."""
|
||||
hass.states.async_set(
|
||||
"test.first", "test.second",
|
||||
)
|
||||
hass.states.async_set("test.second", "test.first")
|
||||
assert location.find_coordinates(hass, "test.first") is None
|
||||
|
||||
|
||||
async def test_coordinates_function_returns_none_if_invalid_coord(hass):
|
||||
"""Test test_coordinates function."""
|
||||
hass.states.async_set(
|
||||
"test.object", "abc",
|
||||
)
|
||||
assert location.find_coordinates(hass, "test.object") is None
|
||||
|
||||
|
||||
def test_coordinates_function_returns_none_if_invalid_input(hass):
|
||||
"""Test test_coordinates function."""
|
||||
assert location.find_coordinates(hass, "test.abc") is None
|
||||
|
Loading…
x
Reference in New Issue
Block a user