diff --git a/.coveragerc b/.coveragerc index eae8060449a..f52631a57bd 100644 --- a/.coveragerc +++ b/.coveragerc @@ -640,8 +640,8 @@ omit = homeassistant/components/lg_soundbar/media_player.py homeassistant/components/life360/__init__.py homeassistant/components/life360/const.py + homeassistant/components/life360/coordinator.py homeassistant/components/life360/device_tracker.py - homeassistant/components/life360/helpers.py homeassistant/components/lifx/__init__.py homeassistant/components/lifx/const.py homeassistant/components/lifx/light.py diff --git a/CODEOWNERS b/CODEOWNERS index e349ebeacf0..1b6d88b5464 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -572,6 +572,7 @@ build.json @home-assistant/supervisor /tests/components/lcn/ @alengwenus /homeassistant/components/lg_netcast/ @Drafteed /homeassistant/components/life360/ @pnbruckner +/tests/components/life360/ @pnbruckner /homeassistant/components/lifx/ @Djelibeybi /homeassistant/components/light/ @home-assistant/core /tests/components/light/ @home-assistant/core diff --git a/homeassistant/components/life360/__init__.py b/homeassistant/components/life360/__init__.py index 89e7ee680a5..66c9416a1c1 100644 --- a/homeassistant/components/life360/__init__.py +++ b/homeassistant/components/life360/__init__.py @@ -1,11 +1,14 @@ """Life360 integration.""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass, field +from typing import Any + import voluptuous as vol -from homeassistant import config_entries from homeassistant.components.device_tracker import CONF_SCAN_INTERVAL -from homeassistant.components.device_tracker.const import ( - SCAN_INTERVAL as DEFAULT_SCAN_INTERVAL, -) from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( CONF_EXCLUDE, @@ -16,12 +19,10 @@ from homeassistant.const import ( Platform, ) from homeassistant.core import HomeAssistant -from homeassistant.helpers import discovery import homeassistant.helpers.config_validation as cv from homeassistant.helpers.typing import ConfigType from .const import ( - CONF_AUTHORIZATION, CONF_CIRCLES, CONF_DRIVING_SPEED, CONF_ERROR_THRESHOLD, @@ -30,182 +31,147 @@ from .const import ( CONF_MEMBERS, CONF_SHOW_AS_STATE, CONF_WARNING_THRESHOLD, + DEFAULT_OPTIONS, DOMAIN, + LOGGER, SHOW_DRIVING, SHOW_MOVING, ) -from .helpers import get_api +from .coordinator import Life360DataUpdateCoordinator -DEFAULT_PREFIX = DOMAIN +PLATFORMS = [Platform.DEVICE_TRACKER] CONF_ACCOUNTS = "accounts" SHOW_AS_STATE_OPTS = [SHOW_DRIVING, SHOW_MOVING] -def _excl_incl_list_to_filter_dict(value): - return { - "include": CONF_INCLUDE in value, - "list": value.get(CONF_EXCLUDE) or value.get(CONF_INCLUDE), - } - - -def _prefix(value): - if not value: - return "" - if not value.endswith("_"): - return f"{value}_" - return value - - -def _thresholds(config): - error_threshold = config.get(CONF_ERROR_THRESHOLD) - warning_threshold = config.get(CONF_WARNING_THRESHOLD) - if error_threshold and warning_threshold: - if error_threshold <= warning_threshold: - raise vol.Invalid( - f"{CONF_ERROR_THRESHOLD} must be larger than {CONF_WARNING_THRESHOLD}" +def _show_as_state(config: dict) -> dict: + if opts := config.pop(CONF_SHOW_AS_STATE): + if SHOW_DRIVING in opts: + config[SHOW_DRIVING] = True + if SHOW_MOVING in opts: + LOGGER.warning( + "%s is no longer supported as an option for %s", + SHOW_MOVING, + CONF_SHOW_AS_STATE, ) - elif not error_threshold and warning_threshold: - config[CONF_ERROR_THRESHOLD] = warning_threshold + 1 - elif error_threshold and not warning_threshold: - # Make them the same which effectively prevents warnings. - config[CONF_WARNING_THRESHOLD] = error_threshold - else: - # Log all errors as errors. - config[CONF_ERROR_THRESHOLD] = 1 - config[CONF_WARNING_THRESHOLD] = 1 return config -ACCOUNT_SCHEMA = vol.Schema( - {vol.Required(CONF_USERNAME): cv.string, vol.Required(CONF_PASSWORD): cv.string} -) +def _unsupported(unsupported: set[str]) -> Callable[[dict], dict]: + """Warn about unsupported options and remove from config.""" -_SLUG_LIST = vol.All( - cv.ensure_list, [cv.slugify], vol.Length(min=1, msg="List cannot be empty") -) + def validator(config: dict) -> dict: + if unsupported_keys := unsupported & set(config): + LOGGER.warning( + "The following options are no longer supported: %s", + ", ".join(sorted(unsupported_keys)), + ) + return {k: v for k, v in config.items() if k not in unsupported} -_LOWER_STRING_LIST = vol.All( - cv.ensure_list, - [vol.All(cv.string, vol.Lower)], - vol.Length(min=1, msg="List cannot be empty"), -) + return validator -_EXCL_INCL_SLUG_LIST = vol.All( - vol.Schema( - { - vol.Exclusive(CONF_EXCLUDE, "incl_excl"): _SLUG_LIST, - vol.Exclusive(CONF_INCLUDE, "incl_excl"): _SLUG_LIST, - } - ), - cv.has_at_least_one_key(CONF_EXCLUDE, CONF_INCLUDE), - _excl_incl_list_to_filter_dict, -) - -_EXCL_INCL_LOWER_STRING_LIST = vol.All( - vol.Schema( - { - vol.Exclusive(CONF_EXCLUDE, "incl_excl"): _LOWER_STRING_LIST, - vol.Exclusive(CONF_INCLUDE, "incl_excl"): _LOWER_STRING_LIST, - } - ), - cv.has_at_least_one_key(CONF_EXCLUDE, CONF_INCLUDE), - _excl_incl_list_to_filter_dict, -) - -_THRESHOLD = vol.All(vol.Coerce(int), vol.Range(min=1)) +ACCOUNT_SCHEMA = { + vol.Required(CONF_USERNAME): cv.string, + vol.Required(CONF_PASSWORD): cv.string, +} +CIRCLES_MEMBERS = { + vol.Optional(CONF_EXCLUDE): vol.All(cv.ensure_list, [cv.string]), + vol.Optional(CONF_INCLUDE): vol.All(cv.ensure_list, [cv.string]), +} LIFE360_SCHEMA = vol.All( vol.Schema( { - vol.Optional(CONF_ACCOUNTS): vol.All( - cv.ensure_list, [ACCOUNT_SCHEMA], vol.Length(min=1) - ), - vol.Optional(CONF_CIRCLES): _EXCL_INCL_LOWER_STRING_LIST, + vol.Optional(CONF_ACCOUNTS): vol.All(cv.ensure_list, [ACCOUNT_SCHEMA]), + vol.Optional(CONF_CIRCLES): CIRCLES_MEMBERS, vol.Optional(CONF_DRIVING_SPEED): vol.Coerce(float), - vol.Optional(CONF_ERROR_THRESHOLD): _THRESHOLD, + vol.Optional(CONF_ERROR_THRESHOLD): vol.Coerce(int), vol.Optional(CONF_MAX_GPS_ACCURACY): vol.Coerce(float), - vol.Optional(CONF_MAX_UPDATE_WAIT): vol.All( - cv.time_period, cv.positive_timedelta - ), - vol.Optional(CONF_MEMBERS): _EXCL_INCL_SLUG_LIST, - vol.Optional(CONF_PREFIX, default=DEFAULT_PREFIX): vol.All( - vol.Any(None, cv.string), _prefix - ), - vol.Optional( - CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL - ): cv.time_period, + vol.Optional(CONF_MAX_UPDATE_WAIT): cv.time_period, + vol.Optional(CONF_MEMBERS): CIRCLES_MEMBERS, + vol.Optional(CONF_PREFIX): vol.Any(None, cv.string), + vol.Optional(CONF_SCAN_INTERVAL): cv.time_period, vol.Optional(CONF_SHOW_AS_STATE, default=[]): vol.All( cv.ensure_list, [vol.In(SHOW_AS_STATE_OPTS)] ), - vol.Optional(CONF_WARNING_THRESHOLD): _THRESHOLD, + vol.Optional(CONF_WARNING_THRESHOLD): vol.Coerce(int), } ), - _thresholds, + _unsupported( + { + CONF_ACCOUNTS, + CONF_CIRCLES, + CONF_ERROR_THRESHOLD, + CONF_MAX_UPDATE_WAIT, + CONF_MEMBERS, + CONF_PREFIX, + CONF_SCAN_INTERVAL, + CONF_WARNING_THRESHOLD, + } + ), + _show_as_state, +) +CONFIG_SCHEMA = vol.Schema( + vol.All({DOMAIN: LIFE360_SCHEMA}, cv.removed(DOMAIN, raise_if_present=False)), + extra=vol.ALLOW_EXTRA, ) -CONFIG_SCHEMA = vol.Schema({DOMAIN: LIFE360_SCHEMA}, extra=vol.ALLOW_EXTRA) + +@dataclass +class IntegData: + """Integration data.""" + + cfg_options: dict[str, Any] | None = None + # ConfigEntry.unique_id: Life360DataUpdateCoordinator + coordinators: dict[str, Life360DataUpdateCoordinator] = field( + init=False, default_factory=dict + ) + # member_id: ConfigEntry.unique_id + tracked_members: dict[str, str] = field(init=False, default_factory=dict) + logged_circles: list[str] = field(init=False, default_factory=list) + logged_places: list[str] = field(init=False, default_factory=list) + + def __post_init__(self): + """Finish initialization of cfg_options.""" + self.cfg_options = self.cfg_options or {} -def setup(hass: HomeAssistant, config: ConfigType) -> bool: +async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up integration.""" - conf = config.get(DOMAIN, LIFE360_SCHEMA({})) - hass.data[DOMAIN] = {"config": conf, "apis": {}} - discovery.load_platform(hass, Platform.DEVICE_TRACKER, DOMAIN, None, config) - - if CONF_ACCOUNTS not in conf: - return True - - # Check existing config entries. For any that correspond to an entry in - # configuration.yaml, and whose password has not changed, nothing needs to - # be done with that config entry or that account from configuration.yaml. - # But if the config entry was created by import and the account no longer - # exists in configuration.yaml, or if the password has changed, then delete - # that out-of-date config entry. - already_configured = [] - for entry in hass.config_entries.async_entries(DOMAIN): - # Find corresponding configuration.yaml entry and its password. - password = None - for account in conf[CONF_ACCOUNTS]: - if account[CONF_USERNAME] == entry.data[CONF_USERNAME]: - password = account[CONF_PASSWORD] - if password == entry.data[CONF_PASSWORD]: - already_configured.append(entry.data[CONF_USERNAME]) - continue - if ( - not password - and entry.source == config_entries.SOURCE_IMPORT - or password - and password != entry.data[CONF_PASSWORD] - ): - hass.async_create_task(hass.config_entries.async_remove(entry.entry_id)) - - # Create config entries for accounts listed in configuration. - for account in conf[CONF_ACCOUNTS]: - if account[CONF_USERNAME] not in already_configured: - hass.async_create_task( - hass.config_entries.flow.async_init( - DOMAIN, - context={"source": config_entries.SOURCE_IMPORT}, - data=account, - ) - ) + hass.data.setdefault(DOMAIN, IntegData(config.get(DOMAIN))) return True async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up config entry.""" - hass.data[DOMAIN]["apis"][entry.data[CONF_USERNAME]] = get_api( - entry.data[CONF_AUTHORIZATION] - ) + hass.data.setdefault(DOMAIN, IntegData()) + + # Check if this entry was created when this was a "legacy" tracker. If it was, + # update with missing data. + if not entry.unique_id: + hass.config_entries.async_update_entry( + entry, + unique_id=entry.data[CONF_USERNAME].lower(), + options=DEFAULT_OPTIONS | hass.data[DOMAIN].cfg_options, + ) + + coordinator = Life360DataUpdateCoordinator(hass, entry) + + await coordinator.async_config_entry_first_refresh() + + hass.data[DOMAIN].coordinators[entry.entry_id] = coordinator + + # Set up components for our platforms. + hass.config_entries.async_setup_platforms(entry, PLATFORMS) + return True async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload config entry.""" - try: - hass.data[DOMAIN]["apis"].pop(entry.data[CONF_USERNAME]) - return True - except KeyError: - return False + del hass.data[DOMAIN].coordinators[entry.entry_id] + + # Unload components for our platforms. + return await hass.config_entries.async_unload_platforms(entry, PLATFORMS) diff --git a/homeassistant/components/life360/config_flow.py b/homeassistant/components/life360/config_flow.py index 0a200e72097..331882aa991 100644 --- a/homeassistant/components/life360/config_flow.py +++ b/homeassistant/components/life360/config_flow.py @@ -1,108 +1,199 @@ """Config flow to configure Life360 integration.""" -from collections import OrderedDict -import logging -from life360 import Life360Error, LoginError +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any, cast + +from life360 import Life360, Life360Error, LoginError import voluptuous as vol -from homeassistant import config_entries +from homeassistant.config_entries import ConfigEntry, ConfigFlow, OptionsFlow from homeassistant.const import CONF_PASSWORD, CONF_USERNAME +from homeassistant.core import callback +from homeassistant.data_entry_flow import FlowResult +import homeassistant.helpers.config_validation as cv -from .const import CONF_AUTHORIZATION, DOMAIN -from .helpers import get_api +from .const import ( + COMM_MAX_RETRIES, + COMM_TIMEOUT, + CONF_AUTHORIZATION, + CONF_DRIVING_SPEED, + CONF_MAX_GPS_ACCURACY, + DEFAULT_OPTIONS, + DOMAIN, + LOGGER, + OPTIONS, + SHOW_DRIVING, +) -_LOGGER = logging.getLogger(__name__) - -DOCS_URL = "https://www.home-assistant.io/integrations/life360" +LIMIT_GPS_ACC = "limit_gps_acc" +SET_DRIVE_SPEED = "set_drive_speed" -class Life360ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): +def account_schema( + def_username: str | vol.UNDEFINED = vol.UNDEFINED, + def_password: str | vol.UNDEFINED = vol.UNDEFINED, +) -> dict[vol.Marker, Any]: + """Return schema for an account with optional default values.""" + return { + vol.Required(CONF_USERNAME, default=def_username): cv.string, + vol.Required(CONF_PASSWORD, default=def_password): cv.string, + } + + +def password_schema( + def_password: str | vol.UNDEFINED = vol.UNDEFINED, +) -> dict[vol.Marker, Any]: + """Return schema for a password with optional default value.""" + return {vol.Required(CONF_PASSWORD, default=def_password): cv.string} + + +class Life360ConfigFlow(ConfigFlow, domain=DOMAIN): """Life360 integration config flow.""" VERSION = 1 - def __init__(self): + def __init__(self) -> None: """Initialize.""" - self._api = get_api() - self._username = vol.UNDEFINED - self._password = vol.UNDEFINED + self._api = Life360(timeout=COMM_TIMEOUT, max_retries=COMM_MAX_RETRIES) + self._username: str | vol.UNDEFINED = vol.UNDEFINED + self._password: str | vol.UNDEFINED = vol.UNDEFINED + self._reauth_entry: ConfigEntry | None = None - @property - def configured_usernames(self): - """Return tuple of configured usernames.""" - entries = self._async_current_entries() - if entries: - return (entry.data[CONF_USERNAME] for entry in entries) - return () + @staticmethod + @callback + def async_get_options_flow(config_entry: ConfigEntry) -> Life360OptionsFlow: + """Get the options flow for this handler.""" + return Life360OptionsFlow(config_entry) - async def async_step_user(self, user_input=None): - """Handle a user initiated config flow.""" - errors = {} - - if user_input is not None: - self._username = user_input[CONF_USERNAME] - self._password = user_input[CONF_PASSWORD] - try: - # pylint: disable=no-value-for-parameter - vol.Email()(self._username) - authorization = await self.hass.async_add_executor_job( - self._api.get_authorization, self._username, self._password - ) - except vol.Invalid: - errors[CONF_USERNAME] = "invalid_username" - except LoginError: - errors["base"] = "invalid_auth" - except Life360Error as error: - _LOGGER.error( - "Unexpected error communicating with Life360 server: %s", error - ) - errors["base"] = "unknown" - else: - if self._username in self.configured_usernames: - errors["base"] = "already_configured" - else: - return self.async_create_entry( - title=self._username, - data={ - CONF_USERNAME: self._username, - CONF_PASSWORD: self._password, - CONF_AUTHORIZATION: authorization, - }, - description_placeholders={"docs_url": DOCS_URL}, - ) - - data_schema = OrderedDict() - data_schema[vol.Required(CONF_USERNAME, default=self._username)] = str - data_schema[vol.Required(CONF_PASSWORD, default=self._password)] = str - - return self.async_show_form( - step_id="user", - data_schema=vol.Schema(data_schema), - errors=errors, - description_placeholders={"docs_url": DOCS_URL}, - ) - - async def async_step_import(self, user_input): - """Import a config flow from configuration.""" - username = user_input[CONF_USERNAME] - password = user_input[CONF_PASSWORD] + async def _async_verify(self, step_id: str) -> FlowResult: + """Attempt to authorize the provided credentials.""" + errors: dict[str, str] = {} try: authorization = await self.hass.async_add_executor_job( - self._api.get_authorization, username, password + self._api.get_authorization, self._username, self._password ) - except LoginError: - _LOGGER.error("Invalid credentials for %s", username) - return self.async_abort(reason="invalid_auth") - except Life360Error as error: - _LOGGER.error( - "Unexpected error communicating with Life360 server: %s", error + except LoginError as exc: + LOGGER.debug("Login error: %s", exc) + errors["base"] = "invalid_auth" + except Life360Error as exc: + LOGGER.debug("Unexpected error communicating with Life360 server: %s", exc) + errors["base"] = "cannot_connect" + if errors: + if step_id == "user": + schema = account_schema(self._username, self._password) + else: + schema = password_schema(self._password) + return self.async_show_form( + step_id=step_id, data_schema=vol.Schema(schema), errors=errors ) - return self.async_abort(reason="unknown") + + data = { + CONF_USERNAME: self._username, + CONF_PASSWORD: self._password, + CONF_AUTHORIZATION: authorization, + } + + if self._reauth_entry: + LOGGER.debug("Reauthorization successful") + self.hass.config_entries.async_update_entry(self._reauth_entry, data=data) + self.hass.async_create_task( + self.hass.config_entries.async_reload(self._reauth_entry.entry_id) + ) + return self.async_abort(reason="reauth_successful") + return self.async_create_entry( - title=f"{username} (from configuration)", - data={ - CONF_USERNAME: username, - CONF_PASSWORD: password, - CONF_AUTHORIZATION: authorization, - }, + title=cast(str, self.unique_id), data=data, options=DEFAULT_OPTIONS ) + + async def async_step_user( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + """Handle a config flow initiated by the user.""" + if not user_input: + return self.async_show_form( + step_id="user", data_schema=vol.Schema(account_schema()) + ) + + self._username = user_input[CONF_USERNAME] + self._password = user_input[CONF_PASSWORD] + + await self.async_set_unique_id(self._username.lower()) + self._abort_if_unique_id_configured() + + return await self._async_verify("user") + + async def async_step_reauth(self, data: Mapping[str, Any]) -> FlowResult: + """Handle reauthorization.""" + self._username = data[CONF_USERNAME] + self._reauth_entry = self.hass.config_entries.async_get_entry( + self.context["entry_id"] + ) + # Always start with current credentials since they may still be valid and a + # simple reauthorization will be successful. + return await self.async_step_reauth_confirm(dict(data)) + + async def async_step_reauth_confirm(self, user_input: dict[str, Any]) -> FlowResult: + """Handle reauthorization completion.""" + self._password = user_input[CONF_PASSWORD] + return await self._async_verify("reauth_confirm") + + +class Life360OptionsFlow(OptionsFlow): + """Life360 integration options flow.""" + + def __init__(self, config_entry: ConfigEntry) -> None: + """Initialize.""" + self.config_entry = config_entry + + async def async_step_init( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: + """Handle account options.""" + options = self.config_entry.options + + if user_input is not None: + new_options = _extract_account_options(user_input) + return self.async_create_entry(title="", data=new_options) + + return self.async_show_form( + step_id="init", data_schema=vol.Schema(_account_options_schema(options)) + ) + + +def _account_options_schema(options: Mapping[str, Any]) -> dict[vol.Marker, Any]: + """Create schema for account options form.""" + def_limit_gps_acc = options[CONF_MAX_GPS_ACCURACY] is not None + def_max_gps = options[CONF_MAX_GPS_ACCURACY] or vol.UNDEFINED + def_set_drive_speed = options[CONF_DRIVING_SPEED] is not None + def_speed = options[CONF_DRIVING_SPEED] or vol.UNDEFINED + def_show_driving = options[SHOW_DRIVING] + + return { + vol.Required(LIMIT_GPS_ACC, default=def_limit_gps_acc): bool, + vol.Optional(CONF_MAX_GPS_ACCURACY, default=def_max_gps): vol.Coerce(float), + vol.Required(SET_DRIVE_SPEED, default=def_set_drive_speed): bool, + vol.Optional(CONF_DRIVING_SPEED, default=def_speed): vol.Coerce(float), + vol.Optional(SHOW_DRIVING, default=def_show_driving): bool, + } + + +def _extract_account_options(user_input: dict) -> dict[str, Any]: + """Remove options from user input and return as a separate dict.""" + result = {} + + for key in OPTIONS: + value = user_input.pop(key, None) + # Was "include" checkbox (if there was one) corresponding to option key True + # (meaning option should be included)? + incl = user_input.pop( + { + CONF_MAX_GPS_ACCURACY: LIMIT_GPS_ACC, + CONF_DRIVING_SPEED: SET_DRIVE_SPEED, + }.get(key), + True, + ) + result[key] = value if incl else None + + return result diff --git a/homeassistant/components/life360/const.py b/homeassistant/components/life360/const.py index 41f4a990e67..ccaf69877d6 100644 --- a/homeassistant/components/life360/const.py +++ b/homeassistant/components/life360/const.py @@ -1,5 +1,25 @@ """Constants for Life360 integration.""" + +from datetime import timedelta +import logging + DOMAIN = "life360" +LOGGER = logging.getLogger(__package__) + +ATTRIBUTION = "Data provided by life360.com" +COMM_MAX_RETRIES = 2 +COMM_TIMEOUT = 3.05 +SPEED_FACTOR_MPH = 2.25 +SPEED_DIGITS = 1 +UPDATE_INTERVAL = timedelta(seconds=10) + +ATTR_ADDRESS = "address" +ATTR_AT_LOC_SINCE = "at_loc_since" +ATTR_DRIVING = "driving" +ATTR_LAST_SEEN = "last_seen" +ATTR_PLACE = "place" +ATTR_SPEED = "speed" +ATTR_WIFI_ON = "wifi_on" CONF_AUTHORIZATION = "authorization" CONF_CIRCLES = "circles" @@ -13,3 +33,10 @@ CONF_WARNING_THRESHOLD = "warning_threshold" SHOW_DRIVING = "driving" SHOW_MOVING = "moving" + +DEFAULT_OPTIONS = { + CONF_DRIVING_SPEED: None, + CONF_MAX_GPS_ACCURACY: None, + SHOW_DRIVING: False, +} +OPTIONS = list(DEFAULT_OPTIONS.keys()) diff --git a/homeassistant/components/life360/coordinator.py b/homeassistant/components/life360/coordinator.py new file mode 100644 index 00000000000..dc7fdb73a8c --- /dev/null +++ b/homeassistant/components/life360/coordinator.py @@ -0,0 +1,201 @@ +"""DataUpdateCoordinator for the Life360 integration.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any + +from life360 import Life360, Life360Error, LoginError + +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import ( + LENGTH_FEET, + LENGTH_KILOMETERS, + LENGTH_METERS, + LENGTH_MILES, +) +from homeassistant.core import HomeAssistant +from homeassistant.exceptions import ConfigEntryAuthFailed +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed +from homeassistant.util.distance import convert +import homeassistant.util.dt as dt_util + +from .const import ( + COMM_MAX_RETRIES, + COMM_TIMEOUT, + CONF_AUTHORIZATION, + DOMAIN, + LOGGER, + SPEED_DIGITS, + SPEED_FACTOR_MPH, + UPDATE_INTERVAL, +) + + +@dataclass +class Life360Place: + """Life360 Place data.""" + + name: str + latitude: float + longitude: float + radius: float + + +@dataclass +class Life360Circle: + """Life360 Circle data.""" + + name: str + places: dict[str, Life360Place] + + +@dataclass +class Life360Member: + """Life360 Member data.""" + + # Don't include address field in eq comparison because it often changes (back and + # forth) between updates. If it was included there would be way more state changes + # and database updates than is useful. + address: str | None = field(compare=False) + at_loc_since: datetime + battery_charging: bool + battery_level: int + driving: bool + entity_picture: str + gps_accuracy: int + last_seen: datetime + latitude: float + longitude: float + name: str + place: str | None + speed: float + wifi_on: bool + + +@dataclass +class Life360Data: + """Life360 data.""" + + circles: dict[str, Life360Circle] = field(init=False, default_factory=dict) + members: dict[str, Life360Member] = field(init=False, default_factory=dict) + + +class Life360DataUpdateCoordinator(DataUpdateCoordinator): + """Life360 data update coordinator.""" + + def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: + """Initialize data update coordinator.""" + super().__init__( + hass, + LOGGER, + name=f"{DOMAIN} ({entry.unique_id})", + update_interval=UPDATE_INTERVAL, + ) + self._hass = hass + self._api = Life360( + timeout=COMM_TIMEOUT, + max_retries=COMM_MAX_RETRIES, + authorization=entry.data[CONF_AUTHORIZATION], + ) + + async def _retrieve_data(self, func: str, *args: Any) -> list[dict[str, Any]]: + """Get data from Life360.""" + try: + return await self._hass.async_add_executor_job( + getattr(self._api, func), *args + ) + except LoginError as exc: + LOGGER.debug("Login error: %s", exc) + raise ConfigEntryAuthFailed from exc + except Life360Error as exc: + LOGGER.debug("%s: %s", exc.__class__.__name__, exc) + raise UpdateFailed from exc + + async def _async_update_data(self) -> Life360Data: + """Get & process data from Life360.""" + + data = Life360Data() + + for circle in await self._retrieve_data("get_circles"): + circle_id = circle["id"] + circle_members = await self._retrieve_data("get_circle_members", circle_id) + circle_places = await self._retrieve_data("get_circle_places", circle_id) + + data.circles[circle_id] = Life360Circle( + circle["name"], + { + place["id"]: Life360Place( + place["name"], + float(place["latitude"]), + float(place["longitude"]), + float(place["radius"]), + ) + for place in circle_places + }, + ) + + for member in circle_members: + # Member isn't sharing location. + if not int(member["features"]["shareLocation"]): + continue + + # Note that member may be in more than one circle. If that's the case just + # go ahead and process the newly retrieved data (overwriting the older + # data), since it might be slightly newer than what was retrieved while + # processing another circle. + + first = member["firstName"] + last = member["lastName"] + if first and last: + name = " ".join([first, last]) + else: + name = first or last + + loc = member["location"] + if not loc: + if err_msg := member["issues"]["title"]: + if member["issues"]["dialog"]: + err_msg += f": {member['issues']['dialog']}" + else: + err_msg = "Location information missing" + LOGGER.error("%s: %s", name, err_msg) + continue + + place = loc["name"] or None + + if place: + address: str | None = place + else: + address1 = loc["address1"] or None + address2 = loc["address2"] or None + if address1 and address2: + address = ", ".join([address1, address2]) + else: + address = address1 or address2 + + speed = max(0, float(loc["speed"]) * SPEED_FACTOR_MPH) + if self._hass.config.units.is_metric: + speed = convert(speed, LENGTH_MILES, LENGTH_KILOMETERS) + + data.members[member["id"]] = Life360Member( + address, + dt_util.utc_from_timestamp(int(loc["since"])), + bool(int(loc["charge"])), + int(float(loc["battery"])), + bool(int(loc["isDriving"])), + member["avatar"], + # Life360 reports accuracy in feet, but Device Tracker expects + # gps_accuracy in meters. + round(convert(float(loc["accuracy"]), LENGTH_FEET, LENGTH_METERS)), + dt_util.utc_from_timestamp(int(loc["timestamp"])), + float(loc["latitude"]), + float(loc["longitude"]), + name, + place, + round(speed, SPEED_DIGITS), + bool(int(loc["wifiState"])), + ) + + return data diff --git a/homeassistant/components/life360/device_tracker.py b/homeassistant/components/life360/device_tracker.py index 2451a237a1e..a38181a6830 100644 --- a/homeassistant/components/life360/device_tracker.py +++ b/homeassistant/components/life360/device_tracker.py @@ -1,432 +1,244 @@ """Support for Life360 device tracking.""" + from __future__ import annotations -from collections.abc import Callable -from datetime import timedelta -import logging +from collections.abc import Mapping +from typing import Any, cast -from life360 import Life360Error -import voluptuous as vol - -from homeassistant.components.device_tracker import ( - CONF_SCAN_INTERVAL, - DOMAIN as DEVICE_TRACKER_DOMAIN, +from homeassistant.components.device_tracker import SOURCE_TYPE_GPS +from homeassistant.components.device_tracker.config_entry import TrackerEntity +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import ATTR_BATTERY_CHARGING +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.update_coordinator import ( + CoordinatorEntity, + DataUpdateCoordinator, ) -from homeassistant.components.zone import async_active_zone -from homeassistant.const import ( - ATTR_BATTERY_CHARGING, - ATTR_ENTITY_ID, - CONF_PREFIX, - LENGTH_FEET, - LENGTH_KILOMETERS, - LENGTH_METERS, - LENGTH_MILES, - STATE_UNKNOWN, -) -from homeassistant.core import HomeAssistant -import homeassistant.helpers.config_validation as cv -from homeassistant.helpers.event import track_time_interval -from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType -from homeassistant.util.async_ import run_callback_threadsafe -from homeassistant.util.distance import convert -import homeassistant.util.dt as dt_util from .const import ( - CONF_CIRCLES, + ATTR_ADDRESS, + ATTR_AT_LOC_SINCE, + ATTR_DRIVING, + ATTR_LAST_SEEN, + ATTR_PLACE, + ATTR_SPEED, + ATTR_WIFI_ON, + ATTRIBUTION, CONF_DRIVING_SPEED, - CONF_ERROR_THRESHOLD, CONF_MAX_GPS_ACCURACY, - CONF_MAX_UPDATE_WAIT, - CONF_MEMBERS, - CONF_SHOW_AS_STATE, - CONF_WARNING_THRESHOLD, DOMAIN, + LOGGER, SHOW_DRIVING, - SHOW_MOVING, ) -_LOGGER = logging.getLogger(__name__) - -SPEED_FACTOR_MPH = 2.25 -EVENT_DELAY = timedelta(seconds=30) - -ATTR_ADDRESS = "address" -ATTR_AT_LOC_SINCE = "at_loc_since" -ATTR_DRIVING = "driving" -ATTR_LAST_SEEN = "last_seen" -ATTR_MOVING = "moving" -ATTR_PLACE = "place" -ATTR_RAW_SPEED = "raw_speed" -ATTR_SPEED = "speed" -ATTR_WAIT = "wait" -ATTR_WIFI_ON = "wifi_on" - -EVENT_UPDATE_OVERDUE = "life360_update_overdue" -EVENT_UPDATE_RESTORED = "life360_update_restored" +_LOC_ATTRS = ( + "address", + "at_loc_since", + "driving", + "gps_accuracy", + "last_seen", + "latitude", + "longitude", + "place", + "speed", +) -def _include_name(filter_dict, name): - if not name: +async def async_setup_entry( + hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback +) -> None: + """Set up the device tracker platform.""" + coordinator = hass.data[DOMAIN].coordinators[entry.entry_id] + tracked_members = hass.data[DOMAIN].tracked_members + logged_circles = hass.data[DOMAIN].logged_circles + logged_places = hass.data[DOMAIN].logged_places + + @callback + def process_data(new_members_only: bool = True) -> None: + """Process new Life360 data.""" + for circle_id, circle in coordinator.data.circles.items(): + if circle_id not in logged_circles: + logged_circles.append(circle_id) + LOGGER.debug("Circle: %s", circle.name) + + new_places = [] + for place_id, place in circle.places.items(): + if place_id not in logged_places: + logged_places.append(place_id) + new_places.append(place) + if new_places: + msg = f"Places from {circle.name}:" + for place in new_places: + msg += f"\n- name: {place.name}" + msg += f"\n latitude: {place.latitude}" + msg += f"\n longitude: {place.longitude}" + msg += f"\n radius: {place.radius}" + LOGGER.debug(msg) + + new_entities = [] + for member_id, member in coordinator.data.members.items(): + tracked_by_account = tracked_members.get(member_id) + if new_member := not tracked_by_account: + tracked_members[member_id] = entry.unique_id + LOGGER.debug("Member: %s", member.name) + if ( + new_member + or tracked_by_account == entry.unique_id + and not new_members_only + ): + new_entities.append(Life360DeviceTracker(coordinator, member_id)) + if new_entities: + async_add_entities(new_entities) + + process_data(new_members_only=False) + entry.async_on_unload(coordinator.async_add_listener(process_data)) + + +class Life360DeviceTracker(CoordinatorEntity, TrackerEntity): + """Life360 Device Tracker.""" + + _attr_attribution = ATTRIBUTION + + def __init__(self, coordinator: DataUpdateCoordinator, member_id: str) -> None: + """Initialize Life360 Entity.""" + super().__init__(coordinator) + self._attr_unique_id = member_id + + self._data = coordinator.data.members[self.unique_id] + + self._attr_name = self._data.name + self._attr_entity_picture = self._data.entity_picture + + self._prev_data = self._data + + @property + def _options(self) -> Mapping[str, Any]: + """Shortcut to config entry options.""" + return cast(Mapping[str, Any], self.coordinator.config_entry.options) + + @callback + def _handle_coordinator_update(self) -> None: + """Handle updated data from the coordinator.""" + # Get a shortcut to this member's data. Can't guarantee it's the same dict every + # update, or that there is even data for this member every update, so need to + # update shortcut each time. + self._data = self.coordinator.data.members.get(self.unique_id) + + if self.available: + # If nothing important has changed, then skip the update altogether. + if self._data == self._prev_data: + return + + # Check if we should effectively throw out new location data. + last_seen = self._data.last_seen + prev_seen = self._prev_data.last_seen + max_gps_acc = self._options.get(CONF_MAX_GPS_ACCURACY) + bad_last_seen = last_seen < prev_seen + bad_accuracy = ( + max_gps_acc is not None and self.location_accuracy > max_gps_acc + ) + if bad_last_seen or bad_accuracy: + if bad_last_seen: + LOGGER.warning( + "%s: Ignoring location update because " + "last_seen (%s) < previous last_seen (%s)", + self.entity_id, + last_seen, + prev_seen, + ) + if bad_accuracy: + LOGGER.warning( + "%s: Ignoring location update because " + "expected GPS accuracy (%0.1f) is not met: %i", + self.entity_id, + max_gps_acc, + self.location_accuracy, + ) + # Overwrite new location related data with previous values. + for attr in _LOC_ATTRS: + setattr(self._data, attr, getattr(self._prev_data, attr)) + + self._prev_data = self._data + + super()._handle_coordinator_update() + + @property + def force_update(self) -> bool: + """Return True if state updates should be forced.""" return False - if not filter_dict: - return True - name = name.lower() - if filter_dict["include"]: - return name in filter_dict["list"] - return name not in filter_dict["list"] + @property + def available(self) -> bool: + """Return if entity is available.""" + # Guard against member not being in last update for some reason. + return super().available and self._data is not None -def _exc_msg(exc): - return f"{exc.__class__.__name__}: {exc}" + @property + def entity_picture(self) -> str | None: + """Return the entity picture to use in the frontend, if any.""" + if self.available: + self._attr_entity_picture = self._data.entity_picture + return super().entity_picture + # All of the following will only be called if self.available is True. -def _dump_filter(filter_dict, desc, func=lambda x: x): - if not filter_dict: - return - _LOGGER.debug( - "%scluding %s: %s", - "In" if filter_dict["include"] else "Ex", - desc, - ", ".join([func(name) for name in filter_dict["list"]]), - ) + @property + def battery_level(self) -> int | None: + """Return the battery level of the device. + Percentage from 0-100. + """ + return self._data.battery_level -def setup_scanner( - hass: HomeAssistant, - config: ConfigType, - see: Callable[..., None], - discovery_info: DiscoveryInfoType | None = None, -) -> bool: - """Set up device scanner.""" - config = hass.data[DOMAIN]["config"] - apis = hass.data[DOMAIN]["apis"] - Life360Scanner(hass, config, see, apis) - return True + @property + def source_type(self) -> str: + """Return the source type, eg gps or router, of the device.""" + return SOURCE_TYPE_GPS + @property + def location_accuracy(self) -> int: + """Return the location accuracy of the device. -def _utc_from_ts(val): - try: - return dt_util.utc_from_timestamp(float(val)) - except (TypeError, ValueError): + Value in meters. + """ + return self._data.gps_accuracy + + @property + def driving(self) -> bool: + """Return if driving.""" + if (driving_speed := self._options.get(CONF_DRIVING_SPEED)) is not None: + if self._data.speed >= driving_speed: + return True + return self._data.driving + + @property + def location_name(self) -> str | None: + """Return a location name for the current location of the device.""" + if self._options.get(SHOW_DRIVING) and self.driving: + return "Driving" return None + @property + def latitude(self) -> float | None: + """Return latitude value of the device.""" + return self._data.latitude -def _dt_attr_from_ts(timestamp): - utc = _utc_from_ts(timestamp) - if utc: - return utc - return STATE_UNKNOWN + @property + def longitude(self) -> float | None: + """Return longitude value of the device.""" + return self._data.longitude - -def _bool_attr_from_int(val): - try: - return bool(int(val)) - except (TypeError, ValueError): - return STATE_UNKNOWN - - -class Life360Scanner: - """Life360 device scanner.""" - - def __init__(self, hass, config, see, apis): - """Initialize Life360Scanner.""" - self._hass = hass - self._see = see - self._max_gps_accuracy = config.get(CONF_MAX_GPS_ACCURACY) - self._max_update_wait = config.get(CONF_MAX_UPDATE_WAIT) - self._prefix = config[CONF_PREFIX] - self._circles_filter = config.get(CONF_CIRCLES) - self._members_filter = config.get(CONF_MEMBERS) - self._driving_speed = config.get(CONF_DRIVING_SPEED) - self._show_as_state = config[CONF_SHOW_AS_STATE] - self._apis = apis - self._errs = {} - self._error_threshold = config[CONF_ERROR_THRESHOLD] - self._warning_threshold = config[CONF_WARNING_THRESHOLD] - self._max_errs = self._error_threshold + 1 - self._dev_data = {} - self._circles_logged = set() - self._members_logged = set() - - _dump_filter(self._circles_filter, "Circles") - _dump_filter(self._members_filter, "device IDs", self._dev_id) - - self._started = dt_util.utcnow() - self._update_life360() - track_time_interval( - self._hass, self._update_life360, config[CONF_SCAN_INTERVAL] - ) - - def _dev_id(self, name): - return self._prefix + name - - def _ok(self, key): - if self._errs.get(key, 0) >= self._max_errs: - _LOGGER.error("%s: OK again", key) - self._errs[key] = 0 - - def _err(self, key, err_msg): - _errs = self._errs.get(key, 0) - if _errs < self._max_errs: - self._errs[key] = _errs = _errs + 1 - msg = f"{key}: {err_msg}" - if _errs >= self._error_threshold: - if _errs == self._max_errs: - msg = f"Suppressing further errors until OK: {msg}" - _LOGGER.error(msg) - elif _errs >= self._warning_threshold: - _LOGGER.warning(msg) - - def _exc(self, key, exc): - self._err(key, _exc_msg(exc)) - - def _prev_seen(self, dev_id, last_seen): - prev_seen, reported = self._dev_data.get(dev_id, (None, False)) - - if self._max_update_wait: - now = dt_util.utcnow() - most_recent_update = last_seen or prev_seen or self._started - overdue = now - most_recent_update > self._max_update_wait - if overdue and not reported and now - self._started > EVENT_DELAY: - self._hass.bus.fire( - EVENT_UPDATE_OVERDUE, - {ATTR_ENTITY_ID: f"{DEVICE_TRACKER_DOMAIN}.{dev_id}"}, - ) - reported = True - elif not overdue and reported: - self._hass.bus.fire( - EVENT_UPDATE_RESTORED, - { - ATTR_ENTITY_ID: f"{DEVICE_TRACKER_DOMAIN}.{dev_id}", - ATTR_WAIT: str(last_seen - (prev_seen or self._started)).split( - ".", maxsplit=1 - )[0], - }, - ) - reported = False - - # Don't remember last_seen unless it's really an update. - if not last_seen or prev_seen and last_seen <= prev_seen: - last_seen = prev_seen - self._dev_data[dev_id] = last_seen, reported - - return prev_seen - - def _update_member(self, member, dev_id): - loc = member.get("location") - try: - last_seen = _utc_from_ts(loc.get("timestamp")) - except AttributeError: - last_seen = None - prev_seen = self._prev_seen(dev_id, last_seen) - - if not loc: - if err_msg := member["issues"]["title"]: - if member["issues"]["dialog"]: - err_msg += f": {member['issues']['dialog']}" - else: - err_msg = "Location information missing" - self._err(dev_id, err_msg) - return - - # Only update when we truly have an update. - if not last_seen: - _LOGGER.warning("%s: Ignoring update because timestamp is missing", dev_id) - return - if prev_seen and last_seen < prev_seen: - _LOGGER.warning( - "%s: Ignoring update because timestamp is older than last timestamp", - dev_id, - ) - _LOGGER.debug("%s < %s", last_seen, prev_seen) - return - if last_seen == prev_seen: - return - - lat = loc.get("latitude") - lon = loc.get("longitude") - gps_accuracy = loc.get("accuracy") - try: - lat = float(lat) - lon = float(lon) - # Life360 reports accuracy in feet, but Device Tracker expects - # gps_accuracy in meters. - gps_accuracy = round( - convert(float(gps_accuracy), LENGTH_FEET, LENGTH_METERS) - ) - except (TypeError, ValueError): - self._err(dev_id, f"GPS data invalid: {lat}, {lon}, {gps_accuracy}") - return - - self._ok(dev_id) - - msg = f"Updating {dev_id}" - if prev_seen: - msg += f"; Time since last update: {last_seen - prev_seen}" - _LOGGER.debug(msg) - - if self._max_gps_accuracy is not None and gps_accuracy > self._max_gps_accuracy: - _LOGGER.warning( - "%s: Ignoring update because expected GPS " - "accuracy (%.0f) is not met: %.0f", - dev_id, - self._max_gps_accuracy, - gps_accuracy, - ) - return - - # Get raw attribute data, converting empty strings to None. - place = loc.get("name") or None - address1 = loc.get("address1") or None - address2 = loc.get("address2") or None - if address1 and address2: - address = ", ".join([address1, address2]) - else: - address = address1 or address2 - raw_speed = loc.get("speed") or None - driving = _bool_attr_from_int(loc.get("isDriving")) - moving = _bool_attr_from_int(loc.get("inTransit")) - try: - battery = int(float(loc.get("battery"))) - except (TypeError, ValueError): - battery = None - - # Try to convert raw speed into real speed. - try: - speed = float(raw_speed) * SPEED_FACTOR_MPH - if self._hass.config.units.is_metric: - speed = convert(speed, LENGTH_MILES, LENGTH_KILOMETERS) - speed = max(0, round(speed)) - except (TypeError, ValueError): - speed = STATE_UNKNOWN - - # Make driving attribute True if it isn't and we can derive that it - # should be True from other data. - if ( - driving in (STATE_UNKNOWN, False) - and self._driving_speed is not None - and speed != STATE_UNKNOWN - ): - driving = speed >= self._driving_speed - - attrs = { - ATTR_ADDRESS: address, - ATTR_AT_LOC_SINCE: _dt_attr_from_ts(loc.get("since")), - ATTR_BATTERY_CHARGING: _bool_attr_from_int(loc.get("charge")), - ATTR_DRIVING: driving, - ATTR_LAST_SEEN: last_seen, - ATTR_MOVING: moving, - ATTR_PLACE: place, - ATTR_RAW_SPEED: raw_speed, - ATTR_SPEED: speed, - ATTR_WIFI_ON: _bool_attr_from_int(loc.get("wifiState")), - } - - # If user wants driving or moving to be shown as state, and current - # location is not in a HA zone, then set location name accordingly. - loc_name = None - active_zone = run_callback_threadsafe( - self._hass.loop, async_active_zone, self._hass, lat, lon, gps_accuracy - ).result() - if not active_zone: - if SHOW_DRIVING in self._show_as_state and driving is True: - loc_name = SHOW_DRIVING - elif SHOW_MOVING in self._show_as_state and moving is True: - loc_name = SHOW_MOVING - - self._see( - dev_id=dev_id, - location_name=loc_name, - gps=(lat, lon), - gps_accuracy=gps_accuracy, - battery=battery, - attributes=attrs, - picture=member.get("avatar"), - ) - - def _update_members(self, members, members_updated): - for member in members: - member_id = member["id"] - if member_id in members_updated: - continue - err_key = "Member data" - try: - first = member.get("firstName") - last = member.get("lastName") - if first and last: - full_name = " ".join([first, last]) - else: - full_name = first or last - slug_name = cv.slugify(full_name) - include_member = _include_name(self._members_filter, slug_name) - dev_id = self._dev_id(slug_name) - if member_id not in self._members_logged: - self._members_logged.add(member_id) - _LOGGER.debug( - "%s -> %s: will%s be tracked, id=%s", - full_name, - dev_id, - "" if include_member else " NOT", - member_id, - ) - sharing = bool(int(member["features"]["shareLocation"])) - except (KeyError, TypeError, ValueError, vol.Invalid): - self._err(err_key, member) - continue - self._ok(err_key) - - if include_member and sharing: - members_updated.append(member_id) - self._update_member(member, dev_id) - - def _update_life360(self, now=None): - circles_updated = [] - members_updated = [] - - for api in self._apis.values(): - err_key = "get_circles" - try: - circles = api.get_circles() - except Life360Error as exc: - self._exc(err_key, exc) - continue - self._ok(err_key) - - for circle in circles: - circle_id = circle["id"] - if circle_id in circles_updated: - continue - circles_updated.append(circle_id) - circle_name = circle["name"] - incl_circle = _include_name(self._circles_filter, circle_name) - if circle_id not in self._circles_logged: - self._circles_logged.add(circle_id) - _LOGGER.debug( - "%s Circle: will%s be included, id=%s", - circle_name, - "" if incl_circle else " NOT", - circle_id, - ) - try: - places = api.get_circle_places(circle_id) - place_data = "Circle's Places:" - for place in places: - place_data += f"\n- name: {place['name']}" - place_data += f"\n latitude: {place['latitude']}" - place_data += f"\n longitude: {place['longitude']}" - place_data += f"\n radius: {place['radius']}" - if not places: - place_data += " None" - _LOGGER.debug(place_data) - except (Life360Error, KeyError): - pass - if incl_circle: - err_key = f'get_circle_members "{circle_name}"' - try: - members = api.get_circle_members(circle_id) - except Life360Error as exc: - self._exc(err_key, exc) - continue - self._ok(err_key) - - self._update_members(members, members_updated) + @property + def extra_state_attributes(self) -> Mapping[str, Any] | None: + """Return entity specific state attributes.""" + attrs = {} + attrs[ATTR_ADDRESS] = self._data.address + attrs[ATTR_AT_LOC_SINCE] = self._data.at_loc_since + attrs[ATTR_BATTERY_CHARGING] = self._data.battery_charging + attrs[ATTR_DRIVING] = self.driving + attrs[ATTR_LAST_SEEN] = self._data.last_seen + attrs[ATTR_PLACE] = self._data.place + attrs[ATTR_SPEED] = self._data.speed + attrs[ATTR_WIFI_ON] = self._data.wifi_on + return attrs diff --git a/homeassistant/components/life360/helpers.py b/homeassistant/components/life360/helpers.py deleted file mode 100644 index 0eb215743df..00000000000 --- a/homeassistant/components/life360/helpers.py +++ /dev/null @@ -1,7 +0,0 @@ -"""Life360 integration helpers.""" -from life360 import Life360 - - -def get_api(authorization=None): - """Create Life360 api object.""" - return Life360(timeout=3.05, max_retries=2, authorization=authorization) diff --git a/homeassistant/components/life360/strings.json b/homeassistant/components/life360/strings.json index 06ac88467ef..cc31ca64a08 100644 --- a/homeassistant/components/life360/strings.json +++ b/homeassistant/components/life360/strings.json @@ -2,26 +2,43 @@ "config": { "step": { "user": { - "title": "Life360 Account Info", + "title": "Configure Life360 Account", "data": { "username": "[%key:common::config_flow::data::username%]", "password": "[%key:common::config_flow::data::password%]" - }, - "description": "To set advanced options, see [Life360 documentation]({docs_url}).\nYou may want to do that before adding accounts." + } + }, + "reauth_confirm": { + "title": "[%key:common::config_flow::title::reauth%]", + "data": { + "password": "[%key:common::config_flow::data::password%]" + } } }, "error": { - "invalid_username": "Invalid username", "invalid_auth": "[%key:common::config_flow::error::invalid_auth%]", "already_configured": "[%key:common::config_flow::abort::already_configured_account%]", + "cannot_connect": "[%key:common::config_flow::error::cannot_connect%]", "unknown": "[%key:common::config_flow::error::unknown%]" }, - "create_entry": { - "default": "To set advanced options, see [Life360 documentation]({docs_url})." - }, "abort": { "invalid_auth": "[%key:common::config_flow::error::invalid_auth%]", - "unknown": "[%key:common::config_flow::error::unknown%]" + "already_configured": "[%key:common::config_flow::abort::already_configured_account%]", + "reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]" + } + }, + "options": { + "step": { + "init": { + "title": "Account Options", + "data": { + "limit_gps_acc": "Limit GPS accuracy", + "max_gps_accuracy": "Max GPS accuracy (meters)", + "set_drive_speed": "Set driving speed threshold", + "driving_speed": "Driving speed", + "driving": "Show driving as state" + } + } } } } diff --git a/homeassistant/components/life360/translations/en.json b/homeassistant/components/life360/translations/en.json index fa836c62b61..b4c9eb452f6 100644 --- a/homeassistant/components/life360/translations/en.json +++ b/homeassistant/components/life360/translations/en.json @@ -1,27 +1,44 @@ { - "config": { - "abort": { - "invalid_auth": "Invalid authentication", - "unknown": "Unexpected error" - }, - "create_entry": { - "default": "To set advanced options, see [Life360 documentation]({docs_url})." - }, - "error": { - "already_configured": "Account is already configured", - "invalid_auth": "Invalid authentication", - "invalid_username": "Invalid username", - "unknown": "Unexpected error" - }, - "step": { - "user": { - "data": { - "password": "Password", - "username": "Username" - }, - "description": "To set advanced options, see [Life360 documentation]({docs_url}).\nYou may want to do that before adding accounts.", - "title": "Life360 Account Info" - } + "config": { + "step": { + "user": { + "title": "Configure Life360 Account", + "data": { + "username": "Username", + "password": "Password" } + }, + "reauth_confirm": { + "title": "Reauthenticate Integration", + "data": { + "password": "Password" + } + } + }, + "error": { + "invalid_auth": "Invalid authentication", + "already_configured": "Account is already configured", + "cannot_connect": "Failed to connect", + "unknown": "Unexpected error" + }, + "abort": { + "invalid_auth": "Invalid authentication", + "already_configured": "Account is already configured", + "reauth_successful": "Re-authentication was successful" } -} \ No newline at end of file + }, + "options": { + "step": { + "init": { + "title": "Account Options", + "data": { + "limit_gps_acc": "Limit GPS accuracy", + "max_gps_accuracy": "Max GPS accuracy (meters)", + "set_drive_speed": "Set driving speed threshold", + "driving_speed": "Driving speed", + "driving": "Show driving as state" + } + } + } + } +} diff --git a/requirements_test_all.txt b/requirements_test_all.txt index f579b9395d9..94d59b40526 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -672,6 +672,9 @@ librouteros==3.2.0 # homeassistant.components.soundtouch libsoundtouch==0.8 +# homeassistant.components.life360 +life360==4.1.1 + # homeassistant.components.logi_circle logi_circle==0.2.3 diff --git a/tests/components/life360/__init__.py b/tests/components/life360/__init__.py new file mode 100644 index 00000000000..0f68b4a343c --- /dev/null +++ b/tests/components/life360/__init__.py @@ -0,0 +1 @@ +"""Tests for the Life360 integration.""" diff --git a/tests/components/life360/test_config_flow.py b/tests/components/life360/test_config_flow.py new file mode 100644 index 00000000000..0b5b850ac23 --- /dev/null +++ b/tests/components/life360/test_config_flow.py @@ -0,0 +1,309 @@ +"""Test the Life360 config flow.""" + +from unittest.mock import patch + +from life360 import Life360Error, LoginError +import pytest +import voluptuous as vol + +from homeassistant import config_entries, data_entry_flow +from homeassistant.components.life360.const import ( + CONF_AUTHORIZATION, + CONF_DRIVING_SPEED, + CONF_MAX_GPS_ACCURACY, + DEFAULT_OPTIONS, + DOMAIN, + SHOW_DRIVING, +) +from homeassistant.const import CONF_PASSWORD, CONF_USERNAME + +from tests.common import MockConfigEntry + +TEST_USER = "Test@Test.com" +TEST_PW = "password" +TEST_PW_3 = "password_3" +TEST_AUTHORIZATION = "authorization_string" +TEST_AUTHORIZATION_2 = "authorization_string_2" +TEST_AUTHORIZATION_3 = "authorization_string_3" +TEST_MAX_GPS_ACCURACY = "300" +TEST_DRIVING_SPEED = "18" +TEST_SHOW_DRIVING = True + +USER_INPUT = {CONF_USERNAME: TEST_USER, CONF_PASSWORD: TEST_PW} + +TEST_CONFIG_DATA = { + CONF_USERNAME: TEST_USER, + CONF_PASSWORD: TEST_PW, + CONF_AUTHORIZATION: TEST_AUTHORIZATION, +} +TEST_CONFIG_DATA_2 = { + CONF_USERNAME: TEST_USER, + CONF_PASSWORD: TEST_PW, + CONF_AUTHORIZATION: TEST_AUTHORIZATION_2, +} +TEST_CONFIG_DATA_3 = { + CONF_USERNAME: TEST_USER, + CONF_PASSWORD: TEST_PW_3, + CONF_AUTHORIZATION: TEST_AUTHORIZATION_3, +} + +USER_OPTIONS = { + "limit_gps_acc": True, + CONF_MAX_GPS_ACCURACY: TEST_MAX_GPS_ACCURACY, + "set_drive_speed": True, + CONF_DRIVING_SPEED: TEST_DRIVING_SPEED, + SHOW_DRIVING: TEST_SHOW_DRIVING, +} +TEST_OPTIONS = { + CONF_MAX_GPS_ACCURACY: float(TEST_MAX_GPS_ACCURACY), + CONF_DRIVING_SPEED: float(TEST_DRIVING_SPEED), + SHOW_DRIVING: TEST_SHOW_DRIVING, +} + + +# ========== Common Fixtures & Functions =============================================== + + +@pytest.fixture(name="life360", autouse=True) +def life360_fixture(): + """Mock life360 config entry setup & unload.""" + with patch( + "homeassistant.components.life360.async_setup_entry", return_value=True + ), patch("homeassistant.components.life360.async_unload_entry", return_value=True): + yield + + +@pytest.fixture +def life360_api(): + """Mock Life360 api.""" + with patch("homeassistant.components.life360.config_flow.Life360") as mock: + yield mock.return_value + + +def create_config_entry(hass, state=None): + """Create mock config entry.""" + config_entry = MockConfigEntry( + domain=DOMAIN, + data=TEST_CONFIG_DATA, + version=1, + state=state, + options=DEFAULT_OPTIONS, + unique_id=TEST_USER.lower(), + ) + config_entry.add_to_hass(hass) + return config_entry + + +# ========== User Flow Tests =========================================================== + + +async def test_user_show_form(hass, life360_api): + """Test that the form is served with no input.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + await hass.async_block_till_done() + + life360_api.get_authorization.assert_not_called() + + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "user" + assert not result["errors"] + + schema = result["data_schema"].schema + assert set(schema) == set(USER_INPUT) + # username and password fields should be empty. + keys = list(schema) + for key in USER_INPUT: + assert keys[keys.index(key)].default == vol.UNDEFINED + + +async def test_user_config_flow_success(hass, life360_api): + """Test a successful user config flow.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + await hass.async_block_till_done() + + life360_api.get_authorization.return_value = TEST_AUTHORIZATION + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], USER_INPUT + ) + await hass.async_block_till_done() + + life360_api.get_authorization.assert_called_once() + + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["title"] == TEST_USER.lower() + assert result["data"] == TEST_CONFIG_DATA + assert result["options"] == DEFAULT_OPTIONS + + +@pytest.mark.parametrize( + "exception,error", [(LoginError, "invalid_auth"), (Life360Error, "cannot_connect")] +) +async def test_user_config_flow_error(hass, life360_api, caplog, exception, error): + """Test a user config flow with an error.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + await hass.async_block_till_done() + + life360_api.get_authorization.side_effect = exception("test reason") + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], USER_INPUT + ) + await hass.async_block_till_done() + + life360_api.get_authorization.assert_called_once() + + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "user" + assert result["errors"] + assert result["errors"]["base"] == error + + assert "test reason" in caplog.text + + schema = result["data_schema"].schema + assert set(schema) == set(USER_INPUT) + # username and password fields should be prefilled with current values. + keys = list(schema) + for key, val in USER_INPUT.items(): + default = keys[keys.index(key)].default + assert default != vol.UNDEFINED + assert default() == val + + +async def test_user_config_flow_already_configured(hass, life360_api): + """Test a user config flow with an account already configured.""" + create_config_entry(hass) + + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": config_entries.SOURCE_USER} + ) + result = await hass.config_entries.flow.async_configure( + result["flow_id"], USER_INPUT + ) + await hass.async_block_till_done() + + life360_api.get_authorization.assert_not_called() + + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "already_configured" + + +# ========== Reauth Flow Tests ========================================================= + + +@pytest.mark.parametrize("state", [None, config_entries.ConfigEntryState.LOADED]) +async def test_reauth_config_flow_success(hass, life360_api, caplog, state): + """Test a successful reauthorization config flow.""" + config_entry = create_config_entry(hass, state=state) + + # Simulate current username & password are still valid, but authorization string has + # expired, such that getting a new authorization string from server is successful. + life360_api.get_authorization.return_value = TEST_AUTHORIZATION_2 + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={ + "source": config_entries.SOURCE_REAUTH, + "entry_id": config_entry.entry_id, + "title_placeholders": {"name": config_entry.title}, + "unique_id": config_entry.unique_id, + }, + data=config_entry.data, + ) + await hass.async_block_till_done() + + life360_api.get_authorization.assert_called_once() + + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "reauth_successful" + + assert "Reauthorization successful" in caplog.text + + assert config_entry.data == TEST_CONFIG_DATA_2 + + +async def test_reauth_config_flow_login_error(hass, life360_api, caplog): + """Test a reauthorization config flow with a login error.""" + config_entry = create_config_entry(hass) + + # Simulate current username & password are invalid, which results in a form + # requesting new password, with old password as default value. + life360_api.get_authorization.side_effect = LoginError("test reason") + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={ + "source": config_entries.SOURCE_REAUTH, + "entry_id": config_entry.entry_id, + "title_placeholders": {"name": config_entry.title}, + "unique_id": config_entry.unique_id, + }, + data=config_entry.data, + ) + await hass.async_block_till_done() + + life360_api.get_authorization.assert_called_once() + + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "reauth_confirm" + assert result["errors"] + assert result["errors"]["base"] == "invalid_auth" + + assert "test reason" in caplog.text + + schema = result["data_schema"].schema + assert len(schema) == 1 + assert "password" in schema + key = list(schema)[0] + assert key.default() == TEST_PW + + # Simulate getting a new, valid password. + life360_api.get_authorization.reset_mock(side_effect=True) + life360_api.get_authorization.return_value = TEST_AUTHORIZATION_3 + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], {CONF_PASSWORD: TEST_PW_3} + ) + await hass.async_block_till_done() + + life360_api.get_authorization.assert_called_once() + + assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT + assert result["reason"] == "reauth_successful" + + assert "Reauthorization successful" in caplog.text + + assert config_entry.data == TEST_CONFIG_DATA_3 + + +# ========== Option flow Tests ========================================================= + + +async def test_options_flow(hass): + """Test an options flow.""" + config_entry = create_config_entry(hass) + + await hass.config_entries.async_setup(config_entry.entry_id) + result = await hass.config_entries.options.async_init(config_entry.entry_id) + + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result["step_id"] == "init" + assert not result["errors"] + + schema = result["data_schema"].schema + assert set(schema) == set(USER_OPTIONS) + + flow_id = result["flow_id"] + + result = await hass.config_entries.options.async_configure(flow_id, USER_OPTIONS) + + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["data"] == TEST_OPTIONS + + assert config_entry.options == TEST_OPTIONS