Convert life360 integration to entity based (#72461)

* Convert life360 integration to entity based

* Improve config_flow.py type checking

* Add tests for config flow

Fix form defaults for reauth flow.

* Cover reauth when config entry loaded

* Update per review (except for dataclasses)

* Restore check for missing location information

This is in current code but was accidentally removed in this PR.

* Fix updates from review

* Update tests per review changes

* Change IntegData to a dataclass

* Use dataclasses to represent fetched Life360 data

* Always add extra attributes

* Update per review take 2

* Tweak handling of bad last_seen or location_accuracy

* Fix type of Life360Member.gps_accuracy

* Update per review take 3

* Update .coveragerc

* Parametrize successful reauth flow test

* Fix test coverage failure

* Update per review take 4

* Fix config schema
This commit is contained in:
Phil Bruckner 2022-06-29 11:40:02 -05:00 committed by GitHub
parent b6f16f87a7
commit 0a65f53356
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1109 additions and 671 deletions

View File

@ -640,8 +640,8 @@ omit =
homeassistant/components/lg_soundbar/media_player.py
homeassistant/components/life360/__init__.py
homeassistant/components/life360/const.py
homeassistant/components/life360/coordinator.py
homeassistant/components/life360/device_tracker.py
homeassistant/components/life360/helpers.py
homeassistant/components/lifx/__init__.py
homeassistant/components/lifx/const.py
homeassistant/components/lifx/light.py

View File

@ -572,6 +572,7 @@ build.json @home-assistant/supervisor
/tests/components/lcn/ @alengwenus
/homeassistant/components/lg_netcast/ @Drafteed
/homeassistant/components/life360/ @pnbruckner
/tests/components/life360/ @pnbruckner
/homeassistant/components/lifx/ @Djelibeybi
/homeassistant/components/light/ @home-assistant/core
/tests/components/light/ @home-assistant/core

View File

@ -1,11 +1,14 @@
"""Life360 integration."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.device_tracker import CONF_SCAN_INTERVAL
from homeassistant.components.device_tracker.const import (
SCAN_INTERVAL as DEFAULT_SCAN_INTERVAL,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_EXCLUDE,
@ -16,12 +19,10 @@ from homeassistant.const import (
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import ConfigType
from .const import (
CONF_AUTHORIZATION,
CONF_CIRCLES,
CONF_DRIVING_SPEED,
CONF_ERROR_THRESHOLD,
@ -30,182 +31,147 @@ from .const import (
CONF_MEMBERS,
CONF_SHOW_AS_STATE,
CONF_WARNING_THRESHOLD,
DEFAULT_OPTIONS,
DOMAIN,
LOGGER,
SHOW_DRIVING,
SHOW_MOVING,
)
from .helpers import get_api
from .coordinator import Life360DataUpdateCoordinator
DEFAULT_PREFIX = DOMAIN
PLATFORMS = [Platform.DEVICE_TRACKER]
CONF_ACCOUNTS = "accounts"
SHOW_AS_STATE_OPTS = [SHOW_DRIVING, SHOW_MOVING]
def _excl_incl_list_to_filter_dict(value):
return {
"include": CONF_INCLUDE in value,
"list": value.get(CONF_EXCLUDE) or value.get(CONF_INCLUDE),
}
def _prefix(value):
if not value:
return ""
if not value.endswith("_"):
return f"{value}_"
return value
def _thresholds(config):
error_threshold = config.get(CONF_ERROR_THRESHOLD)
warning_threshold = config.get(CONF_WARNING_THRESHOLD)
if error_threshold and warning_threshold:
if error_threshold <= warning_threshold:
raise vol.Invalid(
f"{CONF_ERROR_THRESHOLD} must be larger than {CONF_WARNING_THRESHOLD}"
def _show_as_state(config: dict) -> dict:
if opts := config.pop(CONF_SHOW_AS_STATE):
if SHOW_DRIVING in opts:
config[SHOW_DRIVING] = True
if SHOW_MOVING in opts:
LOGGER.warning(
"%s is no longer supported as an option for %s",
SHOW_MOVING,
CONF_SHOW_AS_STATE,
)
elif not error_threshold and warning_threshold:
config[CONF_ERROR_THRESHOLD] = warning_threshold + 1
elif error_threshold and not warning_threshold:
# Make them the same which effectively prevents warnings.
config[CONF_WARNING_THRESHOLD] = error_threshold
else:
# Log all errors as errors.
config[CONF_ERROR_THRESHOLD] = 1
config[CONF_WARNING_THRESHOLD] = 1
return config
ACCOUNT_SCHEMA = vol.Schema(
{vol.Required(CONF_USERNAME): cv.string, vol.Required(CONF_PASSWORD): cv.string}
)
def _unsupported(unsupported: set[str]) -> Callable[[dict], dict]:
"""Warn about unsupported options and remove from config."""
_SLUG_LIST = vol.All(
cv.ensure_list, [cv.slugify], vol.Length(min=1, msg="List cannot be empty")
)
def validator(config: dict) -> dict:
if unsupported_keys := unsupported & set(config):
LOGGER.warning(
"The following options are no longer supported: %s",
", ".join(sorted(unsupported_keys)),
)
return {k: v for k, v in config.items() if k not in unsupported}
_LOWER_STRING_LIST = vol.All(
cv.ensure_list,
[vol.All(cv.string, vol.Lower)],
vol.Length(min=1, msg="List cannot be empty"),
)
return validator
_EXCL_INCL_SLUG_LIST = vol.All(
vol.Schema(
{
vol.Exclusive(CONF_EXCLUDE, "incl_excl"): _SLUG_LIST,
vol.Exclusive(CONF_INCLUDE, "incl_excl"): _SLUG_LIST,
}
),
cv.has_at_least_one_key(CONF_EXCLUDE, CONF_INCLUDE),
_excl_incl_list_to_filter_dict,
)
_EXCL_INCL_LOWER_STRING_LIST = vol.All(
vol.Schema(
{
vol.Exclusive(CONF_EXCLUDE, "incl_excl"): _LOWER_STRING_LIST,
vol.Exclusive(CONF_INCLUDE, "incl_excl"): _LOWER_STRING_LIST,
}
),
cv.has_at_least_one_key(CONF_EXCLUDE, CONF_INCLUDE),
_excl_incl_list_to_filter_dict,
)
_THRESHOLD = vol.All(vol.Coerce(int), vol.Range(min=1))
ACCOUNT_SCHEMA = {
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
}
CIRCLES_MEMBERS = {
vol.Optional(CONF_EXCLUDE): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_INCLUDE): vol.All(cv.ensure_list, [cv.string]),
}
LIFE360_SCHEMA = vol.All(
vol.Schema(
{
vol.Optional(CONF_ACCOUNTS): vol.All(
cv.ensure_list, [ACCOUNT_SCHEMA], vol.Length(min=1)
),
vol.Optional(CONF_CIRCLES): _EXCL_INCL_LOWER_STRING_LIST,
vol.Optional(CONF_ACCOUNTS): vol.All(cv.ensure_list, [ACCOUNT_SCHEMA]),
vol.Optional(CONF_CIRCLES): CIRCLES_MEMBERS,
vol.Optional(CONF_DRIVING_SPEED): vol.Coerce(float),
vol.Optional(CONF_ERROR_THRESHOLD): _THRESHOLD,
vol.Optional(CONF_ERROR_THRESHOLD): vol.Coerce(int),
vol.Optional(CONF_MAX_GPS_ACCURACY): vol.Coerce(float),
vol.Optional(CONF_MAX_UPDATE_WAIT): vol.All(
cv.time_period, cv.positive_timedelta
),
vol.Optional(CONF_MEMBERS): _EXCL_INCL_SLUG_LIST,
vol.Optional(CONF_PREFIX, default=DEFAULT_PREFIX): vol.All(
vol.Any(None, cv.string), _prefix
),
vol.Optional(
CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL
): cv.time_period,
vol.Optional(CONF_MAX_UPDATE_WAIT): cv.time_period,
vol.Optional(CONF_MEMBERS): CIRCLES_MEMBERS,
vol.Optional(CONF_PREFIX): vol.Any(None, cv.string),
vol.Optional(CONF_SCAN_INTERVAL): cv.time_period,
vol.Optional(CONF_SHOW_AS_STATE, default=[]): vol.All(
cv.ensure_list, [vol.In(SHOW_AS_STATE_OPTS)]
),
vol.Optional(CONF_WARNING_THRESHOLD): _THRESHOLD,
vol.Optional(CONF_WARNING_THRESHOLD): vol.Coerce(int),
}
),
_thresholds,
_unsupported(
{
CONF_ACCOUNTS,
CONF_CIRCLES,
CONF_ERROR_THRESHOLD,
CONF_MAX_UPDATE_WAIT,
CONF_MEMBERS,
CONF_PREFIX,
CONF_SCAN_INTERVAL,
CONF_WARNING_THRESHOLD,
}
),
_show_as_state,
)
CONFIG_SCHEMA = vol.Schema(
vol.All({DOMAIN: LIFE360_SCHEMA}, cv.removed(DOMAIN, raise_if_present=False)),
extra=vol.ALLOW_EXTRA,
)
CONFIG_SCHEMA = vol.Schema({DOMAIN: LIFE360_SCHEMA}, extra=vol.ALLOW_EXTRA)
@dataclass
class IntegData:
"""Integration data."""
cfg_options: dict[str, Any] | None = None
# ConfigEntry.unique_id: Life360DataUpdateCoordinator
coordinators: dict[str, Life360DataUpdateCoordinator] = field(
init=False, default_factory=dict
)
# member_id: ConfigEntry.unique_id
tracked_members: dict[str, str] = field(init=False, default_factory=dict)
logged_circles: list[str] = field(init=False, default_factory=list)
logged_places: list[str] = field(init=False, default_factory=list)
def __post_init__(self):
"""Finish initialization of cfg_options."""
self.cfg_options = self.cfg_options or {}
def setup(hass: HomeAssistant, config: ConfigType) -> bool:
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up integration."""
conf = config.get(DOMAIN, LIFE360_SCHEMA({}))
hass.data[DOMAIN] = {"config": conf, "apis": {}}
discovery.load_platform(hass, Platform.DEVICE_TRACKER, DOMAIN, None, config)
if CONF_ACCOUNTS not in conf:
return True
# Check existing config entries. For any that correspond to an entry in
# configuration.yaml, and whose password has not changed, nothing needs to
# be done with that config entry or that account from configuration.yaml.
# But if the config entry was created by import and the account no longer
# exists in configuration.yaml, or if the password has changed, then delete
# that out-of-date config entry.
already_configured = []
for entry in hass.config_entries.async_entries(DOMAIN):
# Find corresponding configuration.yaml entry and its password.
password = None
for account in conf[CONF_ACCOUNTS]:
if account[CONF_USERNAME] == entry.data[CONF_USERNAME]:
password = account[CONF_PASSWORD]
if password == entry.data[CONF_PASSWORD]:
already_configured.append(entry.data[CONF_USERNAME])
continue
if (
not password
and entry.source == config_entries.SOURCE_IMPORT
or password
and password != entry.data[CONF_PASSWORD]
):
hass.async_create_task(hass.config_entries.async_remove(entry.entry_id))
# Create config entries for accounts listed in configuration.
for account in conf[CONF_ACCOUNTS]:
if account[CONF_USERNAME] not in already_configured:
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data=account,
)
)
hass.data.setdefault(DOMAIN, IntegData(config.get(DOMAIN)))
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up config entry."""
hass.data[DOMAIN]["apis"][entry.data[CONF_USERNAME]] = get_api(
entry.data[CONF_AUTHORIZATION]
)
hass.data.setdefault(DOMAIN, IntegData())
# Check if this entry was created when this was a "legacy" tracker. If it was,
# update with missing data.
if not entry.unique_id:
hass.config_entries.async_update_entry(
entry,
unique_id=entry.data[CONF_USERNAME].lower(),
options=DEFAULT_OPTIONS | hass.data[DOMAIN].cfg_options,
)
coordinator = Life360DataUpdateCoordinator(hass, entry)
await coordinator.async_config_entry_first_refresh()
hass.data[DOMAIN].coordinators[entry.entry_id] = coordinator
# Set up components for our platforms.
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload config entry."""
try:
hass.data[DOMAIN]["apis"].pop(entry.data[CONF_USERNAME])
return True
except KeyError:
return False
del hass.data[DOMAIN].coordinators[entry.entry_id]
# Unload components for our platforms.
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@ -1,108 +1,199 @@
"""Config flow to configure Life360 integration."""
from collections import OrderedDict
import logging
from life360 import Life360Error, LoginError
from __future__ import annotations
from collections.abc import Mapping
from typing import Any, cast
from life360 import Life360, Life360Error, LoginError
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.config_entries import ConfigEntry, ConfigFlow, OptionsFlow
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
import homeassistant.helpers.config_validation as cv
from .const import CONF_AUTHORIZATION, DOMAIN
from .helpers import get_api
from .const import (
COMM_MAX_RETRIES,
COMM_TIMEOUT,
CONF_AUTHORIZATION,
CONF_DRIVING_SPEED,
CONF_MAX_GPS_ACCURACY,
DEFAULT_OPTIONS,
DOMAIN,
LOGGER,
OPTIONS,
SHOW_DRIVING,
)
_LOGGER = logging.getLogger(__name__)
DOCS_URL = "https://www.home-assistant.io/integrations/life360"
LIMIT_GPS_ACC = "limit_gps_acc"
SET_DRIVE_SPEED = "set_drive_speed"
class Life360ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
def account_schema(
def_username: str | vol.UNDEFINED = vol.UNDEFINED,
def_password: str | vol.UNDEFINED = vol.UNDEFINED,
) -> dict[vol.Marker, Any]:
"""Return schema for an account with optional default values."""
return {
vol.Required(CONF_USERNAME, default=def_username): cv.string,
vol.Required(CONF_PASSWORD, default=def_password): cv.string,
}
def password_schema(
def_password: str | vol.UNDEFINED = vol.UNDEFINED,
) -> dict[vol.Marker, Any]:
"""Return schema for a password with optional default value."""
return {vol.Required(CONF_PASSWORD, default=def_password): cv.string}
class Life360ConfigFlow(ConfigFlow, domain=DOMAIN):
"""Life360 integration config flow."""
VERSION = 1
def __init__(self):
def __init__(self) -> None:
"""Initialize."""
self._api = get_api()
self._username = vol.UNDEFINED
self._password = vol.UNDEFINED
self._api = Life360(timeout=COMM_TIMEOUT, max_retries=COMM_MAX_RETRIES)
self._username: str | vol.UNDEFINED = vol.UNDEFINED
self._password: str | vol.UNDEFINED = vol.UNDEFINED
self._reauth_entry: ConfigEntry | None = None
@property
def configured_usernames(self):
"""Return tuple of configured usernames."""
entries = self._async_current_entries()
if entries:
return (entry.data[CONF_USERNAME] for entry in entries)
return ()
@staticmethod
@callback
def async_get_options_flow(config_entry: ConfigEntry) -> Life360OptionsFlow:
"""Get the options flow for this handler."""
return Life360OptionsFlow(config_entry)
async def async_step_user(self, user_input=None):
"""Handle a user initiated config flow."""
errors = {}
if user_input is not None:
self._username = user_input[CONF_USERNAME]
self._password = user_input[CONF_PASSWORD]
try:
# pylint: disable=no-value-for-parameter
vol.Email()(self._username)
authorization = await self.hass.async_add_executor_job(
self._api.get_authorization, self._username, self._password
)
except vol.Invalid:
errors[CONF_USERNAME] = "invalid_username"
except LoginError:
errors["base"] = "invalid_auth"
except Life360Error as error:
_LOGGER.error(
"Unexpected error communicating with Life360 server: %s", error
)
errors["base"] = "unknown"
else:
if self._username in self.configured_usernames:
errors["base"] = "already_configured"
else:
return self.async_create_entry(
title=self._username,
data={
CONF_USERNAME: self._username,
CONF_PASSWORD: self._password,
CONF_AUTHORIZATION: authorization,
},
description_placeholders={"docs_url": DOCS_URL},
)
data_schema = OrderedDict()
data_schema[vol.Required(CONF_USERNAME, default=self._username)] = str
data_schema[vol.Required(CONF_PASSWORD, default=self._password)] = str
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(data_schema),
errors=errors,
description_placeholders={"docs_url": DOCS_URL},
)
async def async_step_import(self, user_input):
"""Import a config flow from configuration."""
username = user_input[CONF_USERNAME]
password = user_input[CONF_PASSWORD]
async def _async_verify(self, step_id: str) -> FlowResult:
"""Attempt to authorize the provided credentials."""
errors: dict[str, str] = {}
try:
authorization = await self.hass.async_add_executor_job(
self._api.get_authorization, username, password
self._api.get_authorization, self._username, self._password
)
except LoginError:
_LOGGER.error("Invalid credentials for %s", username)
return self.async_abort(reason="invalid_auth")
except Life360Error as error:
_LOGGER.error(
"Unexpected error communicating with Life360 server: %s", error
except LoginError as exc:
LOGGER.debug("Login error: %s", exc)
errors["base"] = "invalid_auth"
except Life360Error as exc:
LOGGER.debug("Unexpected error communicating with Life360 server: %s", exc)
errors["base"] = "cannot_connect"
if errors:
if step_id == "user":
schema = account_schema(self._username, self._password)
else:
schema = password_schema(self._password)
return self.async_show_form(
step_id=step_id, data_schema=vol.Schema(schema), errors=errors
)
return self.async_abort(reason="unknown")
data = {
CONF_USERNAME: self._username,
CONF_PASSWORD: self._password,
CONF_AUTHORIZATION: authorization,
}
if self._reauth_entry:
LOGGER.debug("Reauthorization successful")
self.hass.config_entries.async_update_entry(self._reauth_entry, data=data)
self.hass.async_create_task(
self.hass.config_entries.async_reload(self._reauth_entry.entry_id)
)
return self.async_abort(reason="reauth_successful")
return self.async_create_entry(
title=f"{username} (from configuration)",
data={
CONF_USERNAME: username,
CONF_PASSWORD: password,
CONF_AUTHORIZATION: authorization,
},
title=cast(str, self.unique_id), data=data, options=DEFAULT_OPTIONS
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle a config flow initiated by the user."""
if not user_input:
return self.async_show_form(
step_id="user", data_schema=vol.Schema(account_schema())
)
self._username = user_input[CONF_USERNAME]
self._password = user_input[CONF_PASSWORD]
await self.async_set_unique_id(self._username.lower())
self._abort_if_unique_id_configured()
return await self._async_verify("user")
async def async_step_reauth(self, data: Mapping[str, Any]) -> FlowResult:
"""Handle reauthorization."""
self._username = data[CONF_USERNAME]
self._reauth_entry = self.hass.config_entries.async_get_entry(
self.context["entry_id"]
)
# Always start with current credentials since they may still be valid and a
# simple reauthorization will be successful.
return await self.async_step_reauth_confirm(dict(data))
async def async_step_reauth_confirm(self, user_input: dict[str, Any]) -> FlowResult:
"""Handle reauthorization completion."""
self._password = user_input[CONF_PASSWORD]
return await self._async_verify("reauth_confirm")
class Life360OptionsFlow(OptionsFlow):
"""Life360 integration options flow."""
def __init__(self, config_entry: ConfigEntry) -> None:
"""Initialize."""
self.config_entry = config_entry
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle account options."""
options = self.config_entry.options
if user_input is not None:
new_options = _extract_account_options(user_input)
return self.async_create_entry(title="", data=new_options)
return self.async_show_form(
step_id="init", data_schema=vol.Schema(_account_options_schema(options))
)
def _account_options_schema(options: Mapping[str, Any]) -> dict[vol.Marker, Any]:
"""Create schema for account options form."""
def_limit_gps_acc = options[CONF_MAX_GPS_ACCURACY] is not None
def_max_gps = options[CONF_MAX_GPS_ACCURACY] or vol.UNDEFINED
def_set_drive_speed = options[CONF_DRIVING_SPEED] is not None
def_speed = options[CONF_DRIVING_SPEED] or vol.UNDEFINED
def_show_driving = options[SHOW_DRIVING]
return {
vol.Required(LIMIT_GPS_ACC, default=def_limit_gps_acc): bool,
vol.Optional(CONF_MAX_GPS_ACCURACY, default=def_max_gps): vol.Coerce(float),
vol.Required(SET_DRIVE_SPEED, default=def_set_drive_speed): bool,
vol.Optional(CONF_DRIVING_SPEED, default=def_speed): vol.Coerce(float),
vol.Optional(SHOW_DRIVING, default=def_show_driving): bool,
}
def _extract_account_options(user_input: dict) -> dict[str, Any]:
"""Remove options from user input and return as a separate dict."""
result = {}
for key in OPTIONS:
value = user_input.pop(key, None)
# Was "include" checkbox (if there was one) corresponding to option key True
# (meaning option should be included)?
incl = user_input.pop(
{
CONF_MAX_GPS_ACCURACY: LIMIT_GPS_ACC,
CONF_DRIVING_SPEED: SET_DRIVE_SPEED,
}.get(key),
True,
)
result[key] = value if incl else None
return result

View File

@ -1,5 +1,25 @@
"""Constants for Life360 integration."""
from datetime import timedelta
import logging
DOMAIN = "life360"
LOGGER = logging.getLogger(__package__)
ATTRIBUTION = "Data provided by life360.com"
COMM_MAX_RETRIES = 2
COMM_TIMEOUT = 3.05
SPEED_FACTOR_MPH = 2.25
SPEED_DIGITS = 1
UPDATE_INTERVAL = timedelta(seconds=10)
ATTR_ADDRESS = "address"
ATTR_AT_LOC_SINCE = "at_loc_since"
ATTR_DRIVING = "driving"
ATTR_LAST_SEEN = "last_seen"
ATTR_PLACE = "place"
ATTR_SPEED = "speed"
ATTR_WIFI_ON = "wifi_on"
CONF_AUTHORIZATION = "authorization"
CONF_CIRCLES = "circles"
@ -13,3 +33,10 @@ CONF_WARNING_THRESHOLD = "warning_threshold"
SHOW_DRIVING = "driving"
SHOW_MOVING = "moving"
DEFAULT_OPTIONS = {
CONF_DRIVING_SPEED: None,
CONF_MAX_GPS_ACCURACY: None,
SHOW_DRIVING: False,
}
OPTIONS = list(DEFAULT_OPTIONS.keys())

View File

@ -0,0 +1,201 @@
"""DataUpdateCoordinator for the Life360 integration."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from life360 import Life360, Life360Error, LoginError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
LENGTH_FEET,
LENGTH_KILOMETERS,
LENGTH_METERS,
LENGTH_MILES,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util.distance import convert
import homeassistant.util.dt as dt_util
from .const import (
COMM_MAX_RETRIES,
COMM_TIMEOUT,
CONF_AUTHORIZATION,
DOMAIN,
LOGGER,
SPEED_DIGITS,
SPEED_FACTOR_MPH,
UPDATE_INTERVAL,
)
@dataclass
class Life360Place:
"""Life360 Place data."""
name: str
latitude: float
longitude: float
radius: float
@dataclass
class Life360Circle:
"""Life360 Circle data."""
name: str
places: dict[str, Life360Place]
@dataclass
class Life360Member:
"""Life360 Member data."""
# Don't include address field in eq comparison because it often changes (back and
# forth) between updates. If it was included there would be way more state changes
# and database updates than is useful.
address: str | None = field(compare=False)
at_loc_since: datetime
battery_charging: bool
battery_level: int
driving: bool
entity_picture: str
gps_accuracy: int
last_seen: datetime
latitude: float
longitude: float
name: str
place: str | None
speed: float
wifi_on: bool
@dataclass
class Life360Data:
"""Life360 data."""
circles: dict[str, Life360Circle] = field(init=False, default_factory=dict)
members: dict[str, Life360Member] = field(init=False, default_factory=dict)
class Life360DataUpdateCoordinator(DataUpdateCoordinator):
"""Life360 data update coordinator."""
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Initialize data update coordinator."""
super().__init__(
hass,
LOGGER,
name=f"{DOMAIN} ({entry.unique_id})",
update_interval=UPDATE_INTERVAL,
)
self._hass = hass
self._api = Life360(
timeout=COMM_TIMEOUT,
max_retries=COMM_MAX_RETRIES,
authorization=entry.data[CONF_AUTHORIZATION],
)
async def _retrieve_data(self, func: str, *args: Any) -> list[dict[str, Any]]:
"""Get data from Life360."""
try:
return await self._hass.async_add_executor_job(
getattr(self._api, func), *args
)
except LoginError as exc:
LOGGER.debug("Login error: %s", exc)
raise ConfigEntryAuthFailed from exc
except Life360Error as exc:
LOGGER.debug("%s: %s", exc.__class__.__name__, exc)
raise UpdateFailed from exc
async def _async_update_data(self) -> Life360Data:
"""Get & process data from Life360."""
data = Life360Data()
for circle in await self._retrieve_data("get_circles"):
circle_id = circle["id"]
circle_members = await self._retrieve_data("get_circle_members", circle_id)
circle_places = await self._retrieve_data("get_circle_places", circle_id)
data.circles[circle_id] = Life360Circle(
circle["name"],
{
place["id"]: Life360Place(
place["name"],
float(place["latitude"]),
float(place["longitude"]),
float(place["radius"]),
)
for place in circle_places
},
)
for member in circle_members:
# Member isn't sharing location.
if not int(member["features"]["shareLocation"]):
continue
# Note that member may be in more than one circle. If that's the case just
# go ahead and process the newly retrieved data (overwriting the older
# data), since it might be slightly newer than what was retrieved while
# processing another circle.
first = member["firstName"]
last = member["lastName"]
if first and last:
name = " ".join([first, last])
else:
name = first or last
loc = member["location"]
if not loc:
if err_msg := member["issues"]["title"]:
if member["issues"]["dialog"]:
err_msg += f": {member['issues']['dialog']}"
else:
err_msg = "Location information missing"
LOGGER.error("%s: %s", name, err_msg)
continue
place = loc["name"] or None
if place:
address: str | None = place
else:
address1 = loc["address1"] or None
address2 = loc["address2"] or None
if address1 and address2:
address = ", ".join([address1, address2])
else:
address = address1 or address2
speed = max(0, float(loc["speed"]) * SPEED_FACTOR_MPH)
if self._hass.config.units.is_metric:
speed = convert(speed, LENGTH_MILES, LENGTH_KILOMETERS)
data.members[member["id"]] = Life360Member(
address,
dt_util.utc_from_timestamp(int(loc["since"])),
bool(int(loc["charge"])),
int(float(loc["battery"])),
bool(int(loc["isDriving"])),
member["avatar"],
# Life360 reports accuracy in feet, but Device Tracker expects
# gps_accuracy in meters.
round(convert(float(loc["accuracy"]), LENGTH_FEET, LENGTH_METERS)),
dt_util.utc_from_timestamp(int(loc["timestamp"])),
float(loc["latitude"]),
float(loc["longitude"]),
name,
place,
round(speed, SPEED_DIGITS),
bool(int(loc["wifiState"])),
)
return data

View File

@ -1,432 +1,244 @@
"""Support for Life360 device tracking."""
from __future__ import annotations
from collections.abc import Callable
from datetime import timedelta
import logging
from collections.abc import Mapping
from typing import Any, cast
from life360 import Life360Error
import voluptuous as vol
from homeassistant.components.device_tracker import (
CONF_SCAN_INTERVAL,
DOMAIN as DEVICE_TRACKER_DOMAIN,
from homeassistant.components.device_tracker import SOURCE_TYPE_GPS
from homeassistant.components.device_tracker.config_entry import TrackerEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_BATTERY_CHARGING
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from homeassistant.components.zone import async_active_zone
from homeassistant.const import (
ATTR_BATTERY_CHARGING,
ATTR_ENTITY_ID,
CONF_PREFIX,
LENGTH_FEET,
LENGTH_KILOMETERS,
LENGTH_METERS,
LENGTH_MILES,
STATE_UNKNOWN,
)
from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.event import track_time_interval
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util.async_ import run_callback_threadsafe
from homeassistant.util.distance import convert
import homeassistant.util.dt as dt_util
from .const import (
CONF_CIRCLES,
ATTR_ADDRESS,
ATTR_AT_LOC_SINCE,
ATTR_DRIVING,
ATTR_LAST_SEEN,
ATTR_PLACE,
ATTR_SPEED,
ATTR_WIFI_ON,
ATTRIBUTION,
CONF_DRIVING_SPEED,
CONF_ERROR_THRESHOLD,
CONF_MAX_GPS_ACCURACY,
CONF_MAX_UPDATE_WAIT,
CONF_MEMBERS,
CONF_SHOW_AS_STATE,
CONF_WARNING_THRESHOLD,
DOMAIN,
LOGGER,
SHOW_DRIVING,
SHOW_MOVING,
)
_LOGGER = logging.getLogger(__name__)
SPEED_FACTOR_MPH = 2.25
EVENT_DELAY = timedelta(seconds=30)
ATTR_ADDRESS = "address"
ATTR_AT_LOC_SINCE = "at_loc_since"
ATTR_DRIVING = "driving"
ATTR_LAST_SEEN = "last_seen"
ATTR_MOVING = "moving"
ATTR_PLACE = "place"
ATTR_RAW_SPEED = "raw_speed"
ATTR_SPEED = "speed"
ATTR_WAIT = "wait"
ATTR_WIFI_ON = "wifi_on"
EVENT_UPDATE_OVERDUE = "life360_update_overdue"
EVENT_UPDATE_RESTORED = "life360_update_restored"
_LOC_ATTRS = (
"address",
"at_loc_since",
"driving",
"gps_accuracy",
"last_seen",
"latitude",
"longitude",
"place",
"speed",
)
def _include_name(filter_dict, name):
if not name:
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up the device tracker platform."""
coordinator = hass.data[DOMAIN].coordinators[entry.entry_id]
tracked_members = hass.data[DOMAIN].tracked_members
logged_circles = hass.data[DOMAIN].logged_circles
logged_places = hass.data[DOMAIN].logged_places
@callback
def process_data(new_members_only: bool = True) -> None:
"""Process new Life360 data."""
for circle_id, circle in coordinator.data.circles.items():
if circle_id not in logged_circles:
logged_circles.append(circle_id)
LOGGER.debug("Circle: %s", circle.name)
new_places = []
for place_id, place in circle.places.items():
if place_id not in logged_places:
logged_places.append(place_id)
new_places.append(place)
if new_places:
msg = f"Places from {circle.name}:"
for place in new_places:
msg += f"\n- name: {place.name}"
msg += f"\n latitude: {place.latitude}"
msg += f"\n longitude: {place.longitude}"
msg += f"\n radius: {place.radius}"
LOGGER.debug(msg)
new_entities = []
for member_id, member in coordinator.data.members.items():
tracked_by_account = tracked_members.get(member_id)
if new_member := not tracked_by_account:
tracked_members[member_id] = entry.unique_id
LOGGER.debug("Member: %s", member.name)
if (
new_member
or tracked_by_account == entry.unique_id
and not new_members_only
):
new_entities.append(Life360DeviceTracker(coordinator, member_id))
if new_entities:
async_add_entities(new_entities)
process_data(new_members_only=False)
entry.async_on_unload(coordinator.async_add_listener(process_data))
class Life360DeviceTracker(CoordinatorEntity, TrackerEntity):
"""Life360 Device Tracker."""
_attr_attribution = ATTRIBUTION
def __init__(self, coordinator: DataUpdateCoordinator, member_id: str) -> None:
"""Initialize Life360 Entity."""
super().__init__(coordinator)
self._attr_unique_id = member_id
self._data = coordinator.data.members[self.unique_id]
self._attr_name = self._data.name
self._attr_entity_picture = self._data.entity_picture
self._prev_data = self._data
@property
def _options(self) -> Mapping[str, Any]:
"""Shortcut to config entry options."""
return cast(Mapping[str, Any], self.coordinator.config_entry.options)
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
# Get a shortcut to this member's data. Can't guarantee it's the same dict every
# update, or that there is even data for this member every update, so need to
# update shortcut each time.
self._data = self.coordinator.data.members.get(self.unique_id)
if self.available:
# If nothing important has changed, then skip the update altogether.
if self._data == self._prev_data:
return
# Check if we should effectively throw out new location data.
last_seen = self._data.last_seen
prev_seen = self._prev_data.last_seen
max_gps_acc = self._options.get(CONF_MAX_GPS_ACCURACY)
bad_last_seen = last_seen < prev_seen
bad_accuracy = (
max_gps_acc is not None and self.location_accuracy > max_gps_acc
)
if bad_last_seen or bad_accuracy:
if bad_last_seen:
LOGGER.warning(
"%s: Ignoring location update because "
"last_seen (%s) < previous last_seen (%s)",
self.entity_id,
last_seen,
prev_seen,
)
if bad_accuracy:
LOGGER.warning(
"%s: Ignoring location update because "
"expected GPS accuracy (%0.1f) is not met: %i",
self.entity_id,
max_gps_acc,
self.location_accuracy,
)
# Overwrite new location related data with previous values.
for attr in _LOC_ATTRS:
setattr(self._data, attr, getattr(self._prev_data, attr))
self._prev_data = self._data
super()._handle_coordinator_update()
@property
def force_update(self) -> bool:
"""Return True if state updates should be forced."""
return False
if not filter_dict:
return True
name = name.lower()
if filter_dict["include"]:
return name in filter_dict["list"]
return name not in filter_dict["list"]
@property
def available(self) -> bool:
"""Return if entity is available."""
# Guard against member not being in last update for some reason.
return super().available and self._data is not None
def _exc_msg(exc):
return f"{exc.__class__.__name__}: {exc}"
@property
def entity_picture(self) -> str | None:
"""Return the entity picture to use in the frontend, if any."""
if self.available:
self._attr_entity_picture = self._data.entity_picture
return super().entity_picture
# All of the following will only be called if self.available is True.
def _dump_filter(filter_dict, desc, func=lambda x: x):
if not filter_dict:
return
_LOGGER.debug(
"%scluding %s: %s",
"In" if filter_dict["include"] else "Ex",
desc,
", ".join([func(name) for name in filter_dict["list"]]),
)
@property
def battery_level(self) -> int | None:
"""Return the battery level of the device.
Percentage from 0-100.
"""
return self._data.battery_level
def setup_scanner(
hass: HomeAssistant,
config: ConfigType,
see: Callable[..., None],
discovery_info: DiscoveryInfoType | None = None,
) -> bool:
"""Set up device scanner."""
config = hass.data[DOMAIN]["config"]
apis = hass.data[DOMAIN]["apis"]
Life360Scanner(hass, config, see, apis)
return True
@property
def source_type(self) -> str:
"""Return the source type, eg gps or router, of the device."""
return SOURCE_TYPE_GPS
@property
def location_accuracy(self) -> int:
"""Return the location accuracy of the device.
def _utc_from_ts(val):
try:
return dt_util.utc_from_timestamp(float(val))
except (TypeError, ValueError):
Value in meters.
"""
return self._data.gps_accuracy
@property
def driving(self) -> bool:
"""Return if driving."""
if (driving_speed := self._options.get(CONF_DRIVING_SPEED)) is not None:
if self._data.speed >= driving_speed:
return True
return self._data.driving
@property
def location_name(self) -> str | None:
"""Return a location name for the current location of the device."""
if self._options.get(SHOW_DRIVING) and self.driving:
return "Driving"
return None
@property
def latitude(self) -> float | None:
"""Return latitude value of the device."""
return self._data.latitude
def _dt_attr_from_ts(timestamp):
utc = _utc_from_ts(timestamp)
if utc:
return utc
return STATE_UNKNOWN
@property
def longitude(self) -> float | None:
"""Return longitude value of the device."""
return self._data.longitude
def _bool_attr_from_int(val):
try:
return bool(int(val))
except (TypeError, ValueError):
return STATE_UNKNOWN
class Life360Scanner:
"""Life360 device scanner."""
def __init__(self, hass, config, see, apis):
"""Initialize Life360Scanner."""
self._hass = hass
self._see = see
self._max_gps_accuracy = config.get(CONF_MAX_GPS_ACCURACY)
self._max_update_wait = config.get(CONF_MAX_UPDATE_WAIT)
self._prefix = config[CONF_PREFIX]
self._circles_filter = config.get(CONF_CIRCLES)
self._members_filter = config.get(CONF_MEMBERS)
self._driving_speed = config.get(CONF_DRIVING_SPEED)
self._show_as_state = config[CONF_SHOW_AS_STATE]
self._apis = apis
self._errs = {}
self._error_threshold = config[CONF_ERROR_THRESHOLD]
self._warning_threshold = config[CONF_WARNING_THRESHOLD]
self._max_errs = self._error_threshold + 1
self._dev_data = {}
self._circles_logged = set()
self._members_logged = set()
_dump_filter(self._circles_filter, "Circles")
_dump_filter(self._members_filter, "device IDs", self._dev_id)
self._started = dt_util.utcnow()
self._update_life360()
track_time_interval(
self._hass, self._update_life360, config[CONF_SCAN_INTERVAL]
)
def _dev_id(self, name):
return self._prefix + name
def _ok(self, key):
if self._errs.get(key, 0) >= self._max_errs:
_LOGGER.error("%s: OK again", key)
self._errs[key] = 0
def _err(self, key, err_msg):
_errs = self._errs.get(key, 0)
if _errs < self._max_errs:
self._errs[key] = _errs = _errs + 1
msg = f"{key}: {err_msg}"
if _errs >= self._error_threshold:
if _errs == self._max_errs:
msg = f"Suppressing further errors until OK: {msg}"
_LOGGER.error(msg)
elif _errs >= self._warning_threshold:
_LOGGER.warning(msg)
def _exc(self, key, exc):
self._err(key, _exc_msg(exc))
def _prev_seen(self, dev_id, last_seen):
prev_seen, reported = self._dev_data.get(dev_id, (None, False))
if self._max_update_wait:
now = dt_util.utcnow()
most_recent_update = last_seen or prev_seen or self._started
overdue = now - most_recent_update > self._max_update_wait
if overdue and not reported and now - self._started > EVENT_DELAY:
self._hass.bus.fire(
EVENT_UPDATE_OVERDUE,
{ATTR_ENTITY_ID: f"{DEVICE_TRACKER_DOMAIN}.{dev_id}"},
)
reported = True
elif not overdue and reported:
self._hass.bus.fire(
EVENT_UPDATE_RESTORED,
{
ATTR_ENTITY_ID: f"{DEVICE_TRACKER_DOMAIN}.{dev_id}",
ATTR_WAIT: str(last_seen - (prev_seen or self._started)).split(
".", maxsplit=1
)[0],
},
)
reported = False
# Don't remember last_seen unless it's really an update.
if not last_seen or prev_seen and last_seen <= prev_seen:
last_seen = prev_seen
self._dev_data[dev_id] = last_seen, reported
return prev_seen
def _update_member(self, member, dev_id):
loc = member.get("location")
try:
last_seen = _utc_from_ts(loc.get("timestamp"))
except AttributeError:
last_seen = None
prev_seen = self._prev_seen(dev_id, last_seen)
if not loc:
if err_msg := member["issues"]["title"]:
if member["issues"]["dialog"]:
err_msg += f": {member['issues']['dialog']}"
else:
err_msg = "Location information missing"
self._err(dev_id, err_msg)
return
# Only update when we truly have an update.
if not last_seen:
_LOGGER.warning("%s: Ignoring update because timestamp is missing", dev_id)
return
if prev_seen and last_seen < prev_seen:
_LOGGER.warning(
"%s: Ignoring update because timestamp is older than last timestamp",
dev_id,
)
_LOGGER.debug("%s < %s", last_seen, prev_seen)
return
if last_seen == prev_seen:
return
lat = loc.get("latitude")
lon = loc.get("longitude")
gps_accuracy = loc.get("accuracy")
try:
lat = float(lat)
lon = float(lon)
# Life360 reports accuracy in feet, but Device Tracker expects
# gps_accuracy in meters.
gps_accuracy = round(
convert(float(gps_accuracy), LENGTH_FEET, LENGTH_METERS)
)
except (TypeError, ValueError):
self._err(dev_id, f"GPS data invalid: {lat}, {lon}, {gps_accuracy}")
return
self._ok(dev_id)
msg = f"Updating {dev_id}"
if prev_seen:
msg += f"; Time since last update: {last_seen - prev_seen}"
_LOGGER.debug(msg)
if self._max_gps_accuracy is not None and gps_accuracy > self._max_gps_accuracy:
_LOGGER.warning(
"%s: Ignoring update because expected GPS "
"accuracy (%.0f) is not met: %.0f",
dev_id,
self._max_gps_accuracy,
gps_accuracy,
)
return
# Get raw attribute data, converting empty strings to None.
place = loc.get("name") or None
address1 = loc.get("address1") or None
address2 = loc.get("address2") or None
if address1 and address2:
address = ", ".join([address1, address2])
else:
address = address1 or address2
raw_speed = loc.get("speed") or None
driving = _bool_attr_from_int(loc.get("isDriving"))
moving = _bool_attr_from_int(loc.get("inTransit"))
try:
battery = int(float(loc.get("battery")))
except (TypeError, ValueError):
battery = None
# Try to convert raw speed into real speed.
try:
speed = float(raw_speed) * SPEED_FACTOR_MPH
if self._hass.config.units.is_metric:
speed = convert(speed, LENGTH_MILES, LENGTH_KILOMETERS)
speed = max(0, round(speed))
except (TypeError, ValueError):
speed = STATE_UNKNOWN
# Make driving attribute True if it isn't and we can derive that it
# should be True from other data.
if (
driving in (STATE_UNKNOWN, False)
and self._driving_speed is not None
and speed != STATE_UNKNOWN
):
driving = speed >= self._driving_speed
attrs = {
ATTR_ADDRESS: address,
ATTR_AT_LOC_SINCE: _dt_attr_from_ts(loc.get("since")),
ATTR_BATTERY_CHARGING: _bool_attr_from_int(loc.get("charge")),
ATTR_DRIVING: driving,
ATTR_LAST_SEEN: last_seen,
ATTR_MOVING: moving,
ATTR_PLACE: place,
ATTR_RAW_SPEED: raw_speed,
ATTR_SPEED: speed,
ATTR_WIFI_ON: _bool_attr_from_int(loc.get("wifiState")),
}
# If user wants driving or moving to be shown as state, and current
# location is not in a HA zone, then set location name accordingly.
loc_name = None
active_zone = run_callback_threadsafe(
self._hass.loop, async_active_zone, self._hass, lat, lon, gps_accuracy
).result()
if not active_zone:
if SHOW_DRIVING in self._show_as_state and driving is True:
loc_name = SHOW_DRIVING
elif SHOW_MOVING in self._show_as_state and moving is True:
loc_name = SHOW_MOVING
self._see(
dev_id=dev_id,
location_name=loc_name,
gps=(lat, lon),
gps_accuracy=gps_accuracy,
battery=battery,
attributes=attrs,
picture=member.get("avatar"),
)
def _update_members(self, members, members_updated):
for member in members:
member_id = member["id"]
if member_id in members_updated:
continue
err_key = "Member data"
try:
first = member.get("firstName")
last = member.get("lastName")
if first and last:
full_name = " ".join([first, last])
else:
full_name = first or last
slug_name = cv.slugify(full_name)
include_member = _include_name(self._members_filter, slug_name)
dev_id = self._dev_id(slug_name)
if member_id not in self._members_logged:
self._members_logged.add(member_id)
_LOGGER.debug(
"%s -> %s: will%s be tracked, id=%s",
full_name,
dev_id,
"" if include_member else " NOT",
member_id,
)
sharing = bool(int(member["features"]["shareLocation"]))
except (KeyError, TypeError, ValueError, vol.Invalid):
self._err(err_key, member)
continue
self._ok(err_key)
if include_member and sharing:
members_updated.append(member_id)
self._update_member(member, dev_id)
def _update_life360(self, now=None):
circles_updated = []
members_updated = []
for api in self._apis.values():
err_key = "get_circles"
try:
circles = api.get_circles()
except Life360Error as exc:
self._exc(err_key, exc)
continue
self._ok(err_key)
for circle in circles:
circle_id = circle["id"]
if circle_id in circles_updated:
continue
circles_updated.append(circle_id)
circle_name = circle["name"]
incl_circle = _include_name(self._circles_filter, circle_name)
if circle_id not in self._circles_logged:
self._circles_logged.add(circle_id)
_LOGGER.debug(
"%s Circle: will%s be included, id=%s",
circle_name,
"" if incl_circle else " NOT",
circle_id,
)
try:
places = api.get_circle_places(circle_id)
place_data = "Circle's Places:"
for place in places:
place_data += f"\n- name: {place['name']}"
place_data += f"\n latitude: {place['latitude']}"
place_data += f"\n longitude: {place['longitude']}"
place_data += f"\n radius: {place['radius']}"
if not places:
place_data += " None"
_LOGGER.debug(place_data)
except (Life360Error, KeyError):
pass
if incl_circle:
err_key = f'get_circle_members "{circle_name}"'
try:
members = api.get_circle_members(circle_id)
except Life360Error as exc:
self._exc(err_key, exc)
continue
self._ok(err_key)
self._update_members(members, members_updated)
@property
def extra_state_attributes(self) -> Mapping[str, Any] | None:
"""Return entity specific state attributes."""
attrs = {}
attrs[ATTR_ADDRESS] = self._data.address
attrs[ATTR_AT_LOC_SINCE] = self._data.at_loc_since
attrs[ATTR_BATTERY_CHARGING] = self._data.battery_charging
attrs[ATTR_DRIVING] = self.driving
attrs[ATTR_LAST_SEEN] = self._data.last_seen
attrs[ATTR_PLACE] = self._data.place
attrs[ATTR_SPEED] = self._data.speed
attrs[ATTR_WIFI_ON] = self._data.wifi_on
return attrs

View File

@ -1,7 +0,0 @@
"""Life360 integration helpers."""
from life360 import Life360
def get_api(authorization=None):
"""Create Life360 api object."""
return Life360(timeout=3.05, max_retries=2, authorization=authorization)

View File

@ -2,26 +2,43 @@
"config": {
"step": {
"user": {
"title": "Life360 Account Info",
"title": "Configure Life360 Account",
"data": {
"username": "[%key:common::config_flow::data::username%]",
"password": "[%key:common::config_flow::data::password%]"
},
"description": "To set advanced options, see [Life360 documentation]({docs_url}).\nYou may want to do that before adding accounts."
}
},
"reauth_confirm": {
"title": "[%key:common::config_flow::title::reauth%]",
"data": {
"password": "[%key:common::config_flow::data::password%]"
}
}
},
"error": {
"invalid_username": "Invalid username",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"already_configured": "[%key:common::config_flow::abort::already_configured_account%]",
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"create_entry": {
"default": "To set advanced options, see [Life360 documentation]({docs_url})."
},
"abort": {
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
"already_configured": "[%key:common::config_flow::abort::already_configured_account%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
}
},
"options": {
"step": {
"init": {
"title": "Account Options",
"data": {
"limit_gps_acc": "Limit GPS accuracy",
"max_gps_accuracy": "Max GPS accuracy (meters)",
"set_drive_speed": "Set driving speed threshold",
"driving_speed": "Driving speed",
"driving": "Show driving as state"
}
}
}
}
}

View File

@ -1,27 +1,44 @@
{
"config": {
"abort": {
"invalid_auth": "Invalid authentication",
"unknown": "Unexpected error"
},
"create_entry": {
"default": "To set advanced options, see [Life360 documentation]({docs_url})."
},
"error": {
"already_configured": "Account is already configured",
"invalid_auth": "Invalid authentication",
"invalid_username": "Invalid username",
"unknown": "Unexpected error"
},
"step": {
"user": {
"data": {
"password": "Password",
"username": "Username"
},
"description": "To set advanced options, see [Life360 documentation]({docs_url}).\nYou may want to do that before adding accounts.",
"title": "Life360 Account Info"
}
"config": {
"step": {
"user": {
"title": "Configure Life360 Account",
"data": {
"username": "Username",
"password": "Password"
}
},
"reauth_confirm": {
"title": "Reauthenticate Integration",
"data": {
"password": "Password"
}
}
},
"error": {
"invalid_auth": "Invalid authentication",
"already_configured": "Account is already configured",
"cannot_connect": "Failed to connect",
"unknown": "Unexpected error"
},
"abort": {
"invalid_auth": "Invalid authentication",
"already_configured": "Account is already configured",
"reauth_successful": "Re-authentication was successful"
}
}
},
"options": {
"step": {
"init": {
"title": "Account Options",
"data": {
"limit_gps_acc": "Limit GPS accuracy",
"max_gps_accuracy": "Max GPS accuracy (meters)",
"set_drive_speed": "Set driving speed threshold",
"driving_speed": "Driving speed",
"driving": "Show driving as state"
}
}
}
}
}

View File

@ -672,6 +672,9 @@ librouteros==3.2.0
# homeassistant.components.soundtouch
libsoundtouch==0.8
# homeassistant.components.life360
life360==4.1.1
# homeassistant.components.logi_circle
logi_circle==0.2.3

View File

@ -0,0 +1 @@
"""Tests for the Life360 integration."""

View File

@ -0,0 +1,309 @@
"""Test the Life360 config flow."""
from unittest.mock import patch
from life360 import Life360Error, LoginError
import pytest
import voluptuous as vol
from homeassistant import config_entries, data_entry_flow
from homeassistant.components.life360.const import (
CONF_AUTHORIZATION,
CONF_DRIVING_SPEED,
CONF_MAX_GPS_ACCURACY,
DEFAULT_OPTIONS,
DOMAIN,
SHOW_DRIVING,
)
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from tests.common import MockConfigEntry
TEST_USER = "Test@Test.com"
TEST_PW = "password"
TEST_PW_3 = "password_3"
TEST_AUTHORIZATION = "authorization_string"
TEST_AUTHORIZATION_2 = "authorization_string_2"
TEST_AUTHORIZATION_3 = "authorization_string_3"
TEST_MAX_GPS_ACCURACY = "300"
TEST_DRIVING_SPEED = "18"
TEST_SHOW_DRIVING = True
USER_INPUT = {CONF_USERNAME: TEST_USER, CONF_PASSWORD: TEST_PW}
TEST_CONFIG_DATA = {
CONF_USERNAME: TEST_USER,
CONF_PASSWORD: TEST_PW,
CONF_AUTHORIZATION: TEST_AUTHORIZATION,
}
TEST_CONFIG_DATA_2 = {
CONF_USERNAME: TEST_USER,
CONF_PASSWORD: TEST_PW,
CONF_AUTHORIZATION: TEST_AUTHORIZATION_2,
}
TEST_CONFIG_DATA_3 = {
CONF_USERNAME: TEST_USER,
CONF_PASSWORD: TEST_PW_3,
CONF_AUTHORIZATION: TEST_AUTHORIZATION_3,
}
USER_OPTIONS = {
"limit_gps_acc": True,
CONF_MAX_GPS_ACCURACY: TEST_MAX_GPS_ACCURACY,
"set_drive_speed": True,
CONF_DRIVING_SPEED: TEST_DRIVING_SPEED,
SHOW_DRIVING: TEST_SHOW_DRIVING,
}
TEST_OPTIONS = {
CONF_MAX_GPS_ACCURACY: float(TEST_MAX_GPS_ACCURACY),
CONF_DRIVING_SPEED: float(TEST_DRIVING_SPEED),
SHOW_DRIVING: TEST_SHOW_DRIVING,
}
# ========== Common Fixtures & Functions ===============================================
@pytest.fixture(name="life360", autouse=True)
def life360_fixture():
"""Mock life360 config entry setup & unload."""
with patch(
"homeassistant.components.life360.async_setup_entry", return_value=True
), patch("homeassistant.components.life360.async_unload_entry", return_value=True):
yield
@pytest.fixture
def life360_api():
"""Mock Life360 api."""
with patch("homeassistant.components.life360.config_flow.Life360") as mock:
yield mock.return_value
def create_config_entry(hass, state=None):
"""Create mock config entry."""
config_entry = MockConfigEntry(
domain=DOMAIN,
data=TEST_CONFIG_DATA,
version=1,
state=state,
options=DEFAULT_OPTIONS,
unique_id=TEST_USER.lower(),
)
config_entry.add_to_hass(hass)
return config_entry
# ========== User Flow Tests ===========================================================
async def test_user_show_form(hass, life360_api):
"""Test that the form is served with no input."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await hass.async_block_till_done()
life360_api.get_authorization.assert_not_called()
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "user"
assert not result["errors"]
schema = result["data_schema"].schema
assert set(schema) == set(USER_INPUT)
# username and password fields should be empty.
keys = list(schema)
for key in USER_INPUT:
assert keys[keys.index(key)].default == vol.UNDEFINED
async def test_user_config_flow_success(hass, life360_api):
"""Test a successful user config flow."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await hass.async_block_till_done()
life360_api.get_authorization.return_value = TEST_AUTHORIZATION
result = await hass.config_entries.flow.async_configure(
result["flow_id"], USER_INPUT
)
await hass.async_block_till_done()
life360_api.get_authorization.assert_called_once()
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == TEST_USER.lower()
assert result["data"] == TEST_CONFIG_DATA
assert result["options"] == DEFAULT_OPTIONS
@pytest.mark.parametrize(
"exception,error", [(LoginError, "invalid_auth"), (Life360Error, "cannot_connect")]
)
async def test_user_config_flow_error(hass, life360_api, caplog, exception, error):
"""Test a user config flow with an error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await hass.async_block_till_done()
life360_api.get_authorization.side_effect = exception("test reason")
result = await hass.config_entries.flow.async_configure(
result["flow_id"], USER_INPUT
)
await hass.async_block_till_done()
life360_api.get_authorization.assert_called_once()
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "user"
assert result["errors"]
assert result["errors"]["base"] == error
assert "test reason" in caplog.text
schema = result["data_schema"].schema
assert set(schema) == set(USER_INPUT)
# username and password fields should be prefilled with current values.
keys = list(schema)
for key, val in USER_INPUT.items():
default = keys[keys.index(key)].default
assert default != vol.UNDEFINED
assert default() == val
async def test_user_config_flow_already_configured(hass, life360_api):
"""Test a user config flow with an account already configured."""
create_config_entry(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], USER_INPUT
)
await hass.async_block_till_done()
life360_api.get_authorization.assert_not_called()
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
# ========== Reauth Flow Tests =========================================================
@pytest.mark.parametrize("state", [None, config_entries.ConfigEntryState.LOADED])
async def test_reauth_config_flow_success(hass, life360_api, caplog, state):
"""Test a successful reauthorization config flow."""
config_entry = create_config_entry(hass, state=state)
# Simulate current username & password are still valid, but authorization string has
# expired, such that getting a new authorization string from server is successful.
life360_api.get_authorization.return_value = TEST_AUTHORIZATION_2
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={
"source": config_entries.SOURCE_REAUTH,
"entry_id": config_entry.entry_id,
"title_placeholders": {"name": config_entry.title},
"unique_id": config_entry.unique_id,
},
data=config_entry.data,
)
await hass.async_block_till_done()
life360_api.get_authorization.assert_called_once()
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "reauth_successful"
assert "Reauthorization successful" in caplog.text
assert config_entry.data == TEST_CONFIG_DATA_2
async def test_reauth_config_flow_login_error(hass, life360_api, caplog):
"""Test a reauthorization config flow with a login error."""
config_entry = create_config_entry(hass)
# Simulate current username & password are invalid, which results in a form
# requesting new password, with old password as default value.
life360_api.get_authorization.side_effect = LoginError("test reason")
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={
"source": config_entries.SOURCE_REAUTH,
"entry_id": config_entry.entry_id,
"title_placeholders": {"name": config_entry.title},
"unique_id": config_entry.unique_id,
},
data=config_entry.data,
)
await hass.async_block_till_done()
life360_api.get_authorization.assert_called_once()
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "reauth_confirm"
assert result["errors"]
assert result["errors"]["base"] == "invalid_auth"
assert "test reason" in caplog.text
schema = result["data_schema"].schema
assert len(schema) == 1
assert "password" in schema
key = list(schema)[0]
assert key.default() == TEST_PW
# Simulate getting a new, valid password.
life360_api.get_authorization.reset_mock(side_effect=True)
life360_api.get_authorization.return_value = TEST_AUTHORIZATION_3
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_PASSWORD: TEST_PW_3}
)
await hass.async_block_till_done()
life360_api.get_authorization.assert_called_once()
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "reauth_successful"
assert "Reauthorization successful" in caplog.text
assert config_entry.data == TEST_CONFIG_DATA_3
# ========== Option flow Tests =========================================================
async def test_options_flow(hass):
"""Test an options flow."""
config_entry = create_config_entry(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "init"
assert not result["errors"]
schema = result["data_schema"].schema
assert set(schema) == set(USER_OPTIONS)
flow_id = result["flow_id"]
result = await hass.config_entries.options.async_configure(flow_id, USER_OPTIONS)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["data"] == TEST_OPTIONS
assert config_entry.options == TEST_OPTIONS