Add config flow to iCloud (#28968)

* iCloud: setup ConfigFlow and prepare for more platforms

- add config flow + tests
- fix existing services
- add play_sound & display_message services
- document services
- can use devices with the same name
- prepare to add sensor platform

* Review : not copy account conf

* Review: Safer test patch

* Review: remove reset_account

* Review: Use executor_job while IO

* Review: Use executor_job while IO 2

* Review: use hass.helpers.storage.Store()

* Review: no IO in tests

* Remove reset from services.yaml

* Review: remove authenticate.return_value = Mock()

* Review: do not initialize the api with the mocked service

* isort

* Review: @MartinHjelmare Test config flow with all steps

* Review: Fix failed tests names

* Codevov: Add one missing test
This commit is contained in:
Quentame 2019-12-09 17:19:42 +01:00 committed by Martin Hjelmare
parent f60125b5c9
commit c804f8f961
14 changed files with 1471 additions and 564 deletions

View File

@ -319,7 +319,8 @@ omit =
homeassistant/components/iaqualink/light.py
homeassistant/components/iaqualink/sensor.py
homeassistant/components/iaqualink/switch.py
homeassistant/components/icloud/*
homeassistant/components/icloud/__init__.py
homeassistant/components/icloud/device_tracker.py
homeassistant/components/izone/climate.py
homeassistant/components/izone/discovery.py
homeassistant/components/izone/__init__.py

View File

@ -152,6 +152,7 @@ homeassistant/components/huawei_lte/* @scop
homeassistant/components/huawei_router/* @abmantis
homeassistant/components/hue/* @balloob
homeassistant/components/iaqualink/* @flz
homeassistant/components/icloud/* @Quentame
homeassistant/components/ign_sismologia/* @exxamalte
homeassistant/components/incomfort/* @zxdavb
homeassistant/components/influxdb/* @fabaff

View File

@ -0,0 +1,38 @@
{
"config": {
"abort": {
"username_exists": "Account already configured"
},
"error": {
"login": "Login error: please check your email & password",
"send_verification_code": "Failed to send verification code",
"username_exists": "Account already configured",
"validate_verification_code": "Failed to verify your verification code, choose a trust device and start the verification again"
},
"step": {
"trusted_device": {
"data": {
"trusted_device": "Trusted device"
},
"description": "Select your trusted device",
"title": "iCloud trusted device"
},
"user": {
"data": {
"password": "Password",
"username": "Email"
},
"description": "Enter your credentials",
"title": "iCloud credentials"
},
"verification_code": {
"data": {
"verification_code": "Verification code"
},
"description": "Please enter the verification code you just received from iCloud",
"title": "iCloud verification code"
}
},
"title": "Apple iCloud"
}
}

View File

@ -1 +1,606 @@
"""The icloud component."""
"""The iCloud component."""
from datetime import timedelta
import logging
import operator
from typing import Dict
from pyicloud import PyiCloudService
from pyicloud.exceptions import PyiCloudFailedLoginException, PyiCloudNoDevicesException
from pyicloud.services.findmyiphone import AppleDevice
import voluptuous as vol
from homeassistant.components.zone import async_active_zone
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import ATTR_ATTRIBUTION, CONF_PASSWORD, CONF_USERNAME
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import dispatcher_send
from homeassistant.helpers.event import track_point_in_utc_time
from homeassistant.helpers.storage import Store
from homeassistant.helpers.typing import ConfigType, HomeAssistantType, ServiceDataType
from homeassistant.util import slugify
from homeassistant.util.async_ import run_callback_threadsafe
from homeassistant.util.dt import utcnow
from homeassistant.util.location import distance
from .const import (
CONF_ACCOUNT_NAME,
CONF_GPS_ACCURACY_THRESHOLD,
CONF_MAX_INTERVAL,
DEFAULT_GPS_ACCURACY_THRESHOLD,
DEFAULT_MAX_INTERVAL,
DEVICE_BATTERY_LEVEL,
DEVICE_BATTERY_STATUS,
DEVICE_CLASS,
DEVICE_DISPLAY_NAME,
DEVICE_ID,
DEVICE_LOCATION,
DEVICE_LOCATION_LATITUDE,
DEVICE_LOCATION_LONGITUDE,
DEVICE_LOST_MODE_CAPABLE,
DEVICE_LOW_POWER_MODE,
DEVICE_NAME,
DEVICE_PERSON_ID,
DEVICE_RAW_DEVICE_MODEL,
DEVICE_STATUS,
DEVICE_STATUS_CODES,
DEVICE_STATUS_SET,
DOMAIN,
ICLOUD_COMPONENTS,
STORAGE_KEY,
STORAGE_VERSION,
TRACKER_UPDATE,
)
ATTRIBUTION = "Data provided by Apple iCloud"
# entity attributes
ATTR_ACCOUNT_FETCH_INTERVAL = "account_fetch_interval"
ATTR_BATTERY = "battery"
ATTR_BATTERY_STATUS = "battery_status"
ATTR_DEVICE_NAME = "device_name"
ATTR_DEVICE_STATUS = "device_status"
ATTR_LOW_POWER_MODE = "low_power_mode"
ATTR_OWNER_NAME = "owner_fullname"
# services
SERVICE_ICLOUD_PLAY_SOUND = "play_sound"
SERVICE_ICLOUD_DISPLAY_MESSAGE = "display_message"
SERVICE_ICLOUD_LOST_DEVICE = "lost_device"
SERVICE_ICLOUD_UPDATE = "update"
ATTR_ACCOUNT = "account"
ATTR_LOST_DEVICE_MESSAGE = "message"
ATTR_LOST_DEVICE_NUMBER = "number"
ATTR_LOST_DEVICE_SOUND = "sound"
SERVICE_SCHEMA = vol.Schema({vol.Optional(ATTR_ACCOUNT): cv.string})
SERVICE_SCHEMA_PLAY_SOUND = vol.Schema(
{vol.Required(ATTR_ACCOUNT): cv.string, vol.Required(ATTR_DEVICE_NAME): cv.string}
)
SERVICE_SCHEMA_DISPLAY_MESSAGE = vol.Schema(
{
vol.Required(ATTR_ACCOUNT): cv.string,
vol.Required(ATTR_DEVICE_NAME): cv.string,
vol.Required(ATTR_LOST_DEVICE_MESSAGE): cv.string,
vol.Optional(ATTR_LOST_DEVICE_SOUND): cv.boolean,
}
)
SERVICE_SCHEMA_LOST_DEVICE = vol.Schema(
{
vol.Required(ATTR_ACCOUNT): cv.string,
vol.Required(ATTR_DEVICE_NAME): cv.string,
vol.Required(ATTR_LOST_DEVICE_NUMBER): cv.string,
vol.Required(ATTR_LOST_DEVICE_MESSAGE): cv.string,
}
)
ACCOUNT_SCHEMA = vol.Schema(
{
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_ACCOUNT_NAME): cv.string,
vol.Optional(CONF_MAX_INTERVAL, default=DEFAULT_MAX_INTERVAL): cv.positive_int,
vol.Optional(
CONF_GPS_ACCURACY_THRESHOLD, default=DEFAULT_GPS_ACCURACY_THRESHOLD
): cv.positive_int,
}
)
CONFIG_SCHEMA = vol.Schema(
{DOMAIN: vol.Schema(vol.All(cv.ensure_list, [ACCOUNT_SCHEMA]))},
extra=vol.ALLOW_EXTRA,
)
_LOGGER = logging.getLogger(__name__)
async def async_setup(hass: HomeAssistantType, config: ConfigType) -> bool:
"""Set up iCloud from legacy config file."""
conf = config.get(DOMAIN)
if conf is None:
return True
for account_conf in conf:
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=account_conf
)
)
return True
async def async_setup_entry(hass: HomeAssistantType, entry: ConfigEntry) -> bool:
"""Set up an iCloud account from a config entry."""
hass.data.setdefault(DOMAIN, {})
username = entry.data[CONF_USERNAME]
password = entry.data[CONF_PASSWORD]
account_name = entry.data.get(CONF_ACCOUNT_NAME)
max_interval = entry.data[CONF_MAX_INTERVAL]
gps_accuracy_threshold = entry.data[CONF_GPS_ACCURACY_THRESHOLD]
icloud_dir = hass.helpers.storage.Store(STORAGE_VERSION, STORAGE_KEY)
account = IcloudAccount(
hass,
username,
password,
icloud_dir,
account_name,
max_interval,
gps_accuracy_threshold,
)
await hass.async_add_executor_job(account.setup)
hass.data[DOMAIN][username] = account
for component in ICLOUD_COMPONENTS:
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(entry, component)
)
def play_sound(service: ServiceDataType) -> None:
"""Play sound on the device."""
account = service.data[ATTR_ACCOUNT]
device_name = service.data.get(ATTR_DEVICE_NAME)
device_name = slugify(device_name.replace(" ", "", 99))
for device in _get_account(account).get_devices_with_name(device_name):
device.play_sound()
def display_message(service: ServiceDataType) -> None:
"""Display a message on the device."""
account = service.data[ATTR_ACCOUNT]
device_name = service.data.get(ATTR_DEVICE_NAME)
device_name = slugify(device_name.replace(" ", "", 99))
message = service.data.get(ATTR_LOST_DEVICE_MESSAGE)
sound = service.data.get(ATTR_LOST_DEVICE_SOUND, False)
for device in _get_account(account).get_devices_with_name(device_name):
device.display_message(message, sound)
def lost_device(service: ServiceDataType) -> None:
"""Make the device in lost state."""
account = service.data[ATTR_ACCOUNT]
device_name = service.data.get(ATTR_DEVICE_NAME)
device_name = slugify(device_name.replace(" ", "", 99))
number = service.data.get(ATTR_LOST_DEVICE_NUMBER)
message = service.data.get(ATTR_LOST_DEVICE_MESSAGE)
for device in _get_account(account).get_devices_with_name(device_name):
device.lost_device(number, message)
def update_account(service: ServiceDataType) -> None:
"""Call the update function of an iCloud account."""
account = service.data.get(ATTR_ACCOUNT)
if account is None:
for account in hass.data[DOMAIN].values():
account.keep_alive()
else:
_get_account(account).keep_alive()
def _get_account(account_identifier: str) -> any:
if account_identifier is None:
return None
icloud_account = hass.data[DOMAIN].get(account_identifier, None)
if icloud_account is None:
for account in hass.data[DOMAIN].values():
if account.name == account_identifier:
icloud_account = account
if icloud_account is None:
raise Exception(
"No iCloud account with username or name " + account_identifier
)
return icloud_account
hass.services.async_register(
DOMAIN, SERVICE_ICLOUD_PLAY_SOUND, play_sound, schema=SERVICE_SCHEMA_PLAY_SOUND
)
hass.services.async_register(
DOMAIN,
SERVICE_ICLOUD_DISPLAY_MESSAGE,
display_message,
schema=SERVICE_SCHEMA_DISPLAY_MESSAGE,
)
hass.services.async_register(
DOMAIN,
SERVICE_ICLOUD_LOST_DEVICE,
lost_device,
schema=SERVICE_SCHEMA_LOST_DEVICE,
)
hass.services.async_register(
DOMAIN, SERVICE_ICLOUD_UPDATE, update_account, schema=SERVICE_SCHEMA
)
return True
class IcloudAccount:
"""Representation of an iCloud account."""
def __init__(
self,
hass: HomeAssistantType,
username: str,
password: str,
icloud_dir: Store,
account_name: str,
max_interval: int,
gps_accuracy_threshold: int,
):
"""Initialize an iCloud account."""
self.hass = hass
self._username = username
self._password = password
self._name = account_name or slugify(username.partition("@")[0])
self._fetch_interval = max_interval
self._max_interval = max_interval
self._gps_accuracy_threshold = gps_accuracy_threshold
self._icloud_dir = icloud_dir
self.api = None
self._owner_fullname = None
self._family_members_fullname = {}
self._devices = {}
self.unsub_device_tracker = None
def setup(self):
"""Set up an iCloud account."""
try:
self.api = PyiCloudService(
self._username, self._password, self._icloud_dir.path
)
except PyiCloudFailedLoginException as error:
self.api = None
_LOGGER.error("Error logging into iCloud Service: %s", error)
return
user_info = None
try:
# Gets device owners infos
user_info = self.api.devices.response["userInfo"]
except PyiCloudNoDevicesException:
_LOGGER.error("No iCloud Devices found")
self._owner_fullname = f"{user_info['firstName']} {user_info['lastName']}"
self._family_members_fullname = {}
for prs_id, member in user_info["membersInfo"].items():
self._family_members_fullname[
prs_id
] = f"{member['firstName']} {member['lastName']}"
self._devices = {}
self.update_devices()
def update_devices(self) -> None:
"""Update iCloud devices."""
if self.api is None:
return
api_devices = {}
try:
api_devices = self.api.devices
except PyiCloudNoDevicesException:
_LOGGER.error("No iCloud Devices found")
# Gets devices infos
for device in api_devices:
status = device.status(DEVICE_STATUS_SET)
device_id = status[DEVICE_ID]
device_name = status[DEVICE_NAME]
if self._devices.get(device_id, None) is not None:
# Seen device -> updating
_LOGGER.debug("Updating iCloud device: %s", device_name)
self._devices[device_id].update(status)
else:
# New device, should be unique
_LOGGER.debug(
"Adding iCloud device: %s [model: %s]",
device_name,
status[DEVICE_RAW_DEVICE_MODEL],
)
self._devices[device_id] = IcloudDevice(self, device, status)
self._devices[device_id].update(status)
dispatcher_send(self.hass, TRACKER_UPDATE)
self._fetch_interval = self._determine_interval()
track_point_in_utc_time(
self.hass,
self.keep_alive,
utcnow() + timedelta(minutes=self._fetch_interval),
)
def _determine_interval(self) -> int:
"""Calculate new interval between two API fetch (in minutes)."""
intervals = {}
for device in self._devices.values():
if device.location is None:
continue
current_zone = run_callback_threadsafe(
self.hass.loop,
async_active_zone,
self.hass,
device.location[DEVICE_LOCATION_LATITUDE],
device.location[DEVICE_LOCATION_LONGITUDE],
).result()
if current_zone is not None:
intervals[device.name] = self._max_interval
continue
zones = (
self.hass.states.get(entity_id)
for entity_id in sorted(self.hass.states.entity_ids("zone"))
)
distances = []
for zone_state in zones:
zone_state_lat = zone_state.attributes[DEVICE_LOCATION_LATITUDE]
zone_state_long = zone_state.attributes[DEVICE_LOCATION_LONGITUDE]
zone_distance = distance(
device.location[DEVICE_LOCATION_LATITUDE],
device.location[DEVICE_LOCATION_LONGITUDE],
zone_state_lat,
zone_state_long,
)
distances.append(round(zone_distance / 1000, 1))
if not distances:
continue
mindistance = min(distances)
# Calculate out how long it would take for the device to drive
# to the nearest zone at 120 km/h:
interval = round(mindistance / 2, 0)
# Never poll more than once per minute
interval = max(interval, 1)
if interval > 180:
# Three hour drive?
# This is far enough that they might be flying
interval = self._max_interval
if (
device.battery_level is not None
and device.battery_level <= 33
and mindistance > 3
):
# Low battery - let's check half as often
interval = interval * 2
intervals[device.name] = interval
return max(
int(min(intervals.items(), key=operator.itemgetter(1))[1]),
self._max_interval,
)
def keep_alive(self, now=None) -> None:
"""Keep the API alive."""
if self.api is None:
self.setup()
if self.api is None:
return
self.api.authenticate()
self.update_devices()
def get_devices_with_name(self, name: str) -> [any]:
"""Get devices by name."""
result = []
name_slug = slugify(name.replace(" ", "", 99))
for device in self.devices.values():
if slugify(device.name.replace(" ", "", 99)) == name_slug:
result.append(device)
if not result:
raise Exception("No device with name " + name)
return result
@property
def name(self) -> str:
"""Return the account name."""
return self._name
@property
def username(self) -> str:
"""Return the account username."""
return self._username
@property
def owner_fullname(self) -> str:
"""Return the account owner fullname."""
return self._owner_fullname
@property
def family_members_fullname(self) -> Dict[str, str]:
"""Return the account family members fullname."""
return self._family_members_fullname
@property
def fetch_interval(self) -> int:
"""Return the account fetch interval."""
return self._fetch_interval
@property
def devices(self) -> Dict[str, any]:
"""Return the account devices."""
return self._devices
class IcloudDevice:
"""Representation of a iCloud device."""
def __init__(self, account: IcloudAccount, device: AppleDevice, status):
"""Initialize the iCloud device."""
self._account = account
account_name = account.name
self._device = device
self._status = status
self._name = self._status[DEVICE_NAME]
self._device_id = self._status[DEVICE_ID]
self._device_class = self._status[DEVICE_CLASS]
self._device_model = self._status[DEVICE_DISPLAY_NAME]
if self._status[DEVICE_PERSON_ID]:
owner_fullname = account.family_members_fullname[
self._status[DEVICE_PERSON_ID]
]
else:
owner_fullname = account.owner_fullname
self._battery_level = None
self._battery_status = None
self._location = None
self._attrs = {
ATTR_ATTRIBUTION: ATTRIBUTION,
CONF_ACCOUNT_NAME: account_name,
ATTR_ACCOUNT_FETCH_INTERVAL: self._account.fetch_interval,
ATTR_DEVICE_NAME: self._device_model,
ATTR_DEVICE_STATUS: None,
ATTR_OWNER_NAME: owner_fullname,
}
def update(self, status) -> None:
"""Update the iCloud device."""
self._status = status
self._status[ATTR_ACCOUNT_FETCH_INTERVAL] = self._account.fetch_interval
device_status = DEVICE_STATUS_CODES.get(self._status[DEVICE_STATUS], "error")
self._attrs[ATTR_DEVICE_STATUS] = device_status
if self._status[DEVICE_BATTERY_STATUS] != "Unknown":
self._battery_level = int(self._status.get(DEVICE_BATTERY_LEVEL, 0) * 100)
self._battery_status = self._status[DEVICE_BATTERY_STATUS]
low_power_mode = self._status[DEVICE_LOW_POWER_MODE]
self._attrs[ATTR_BATTERY] = self._battery_level
self._attrs[ATTR_BATTERY_STATUS] = self._battery_status
self._attrs[ATTR_LOW_POWER_MODE] = low_power_mode
if (
self._status[DEVICE_LOCATION]
and self._status[DEVICE_LOCATION][DEVICE_LOCATION_LATITUDE]
):
location = self._status[DEVICE_LOCATION]
self._location = location
def play_sound(self) -> None:
"""Play sound on the device."""
if self._account.api is None:
return
self._account.api.authenticate()
_LOGGER.debug("Playing sound for %s", self.name)
self.device.play_sound()
def display_message(self, message: str, sound: bool = False) -> None:
"""Display a message on the device."""
if self._account.api is None:
return
self._account.api.authenticate()
_LOGGER.debug("Displaying message for %s", self.name)
self.device.display_message("Subject not working", message, sound)
def lost_device(self, number: str, message: str) -> None:
"""Make the device in lost state."""
if self._account.api is None:
return
self._account.api.authenticate()
if self._status[DEVICE_LOST_MODE_CAPABLE]:
_LOGGER.debug("Make device lost for %s", self.name)
self.device.lost_device(number, message, None)
else:
_LOGGER.error("Cannot make device lost for %s", self.name)
@property
def unique_id(self) -> str:
"""Return a unique ID."""
return self._device_id
@property
def dev_id(self) -> str:
"""Return the device ID."""
return self._device_id
@property
def name(self) -> str:
"""Return the Apple device name."""
return self._name
@property
def device(self) -> AppleDevice:
"""Return the Apple device."""
return self._device
@property
def device_class(self) -> str:
"""Return the Apple device class."""
return self._device_class
@property
def device_model(self) -> str:
"""Return the Apple device model."""
return self._device_model
@property
def battery_level(self) -> int:
"""Return the Apple device battery level."""
return self._battery_level
@property
def battery_status(self) -> str:
"""Return the Apple device battery status."""
return self._battery_status
@property
def location(self) -> Dict[str, any]:
"""Return the Apple device location."""
return self._location
@property
def state_attributes(self) -> Dict[str, any]:
"""Return the attributes."""
return self._attrs

View File

@ -0,0 +1,230 @@
"""Config flow to configure the iCloud integration."""
import logging
import os
from pyicloud import PyiCloudService
from pyicloud.exceptions import PyiCloudException, PyiCloudFailedLoginException
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from homeassistant.util import slugify
from .const import (
CONF_ACCOUNT_NAME,
CONF_GPS_ACCURACY_THRESHOLD,
CONF_MAX_INTERVAL,
DEFAULT_GPS_ACCURACY_THRESHOLD,
DEFAULT_MAX_INTERVAL,
STORAGE_KEY,
STORAGE_VERSION,
)
from .const import DOMAIN # pylint: disable=unused-import
CONF_TRUSTED_DEVICE = "trusted_device"
CONF_VERIFICATION_CODE = "verification_code"
_LOGGER = logging.getLogger(__name__)
class IcloudFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a iCloud config flow."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_CLOUD_POLL
def __init__(self):
"""Initialize iCloud config flow."""
self.api = None
self._username = None
self._password = None
self._account_name = None
self._max_interval = None
self._gps_accuracy_threshold = None
self._trusted_device = None
self._verification_code = None
def _configuration_exists(self, username: str, account_name: str) -> bool:
"""Return True if username or account_name exists in configuration."""
for entry in self._async_current_entries():
if (
entry.data[CONF_USERNAME] == username
or entry.data.get(CONF_ACCOUNT_NAME) == account_name
or slugify(entry.data[CONF_USERNAME].partition("@")[0]) == account_name
):
return True
return False
async def _show_setup_form(self, user_input=None, errors=None):
"""Show the setup form to the user."""
if user_input is None:
user_input = {}
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(
CONF_USERNAME, default=user_input.get(CONF_USERNAME, "")
): str,
vol.Required(
CONF_PASSWORD, default=user_input.get(CONF_PASSWORD, "")
): str,
}
),
errors=errors or {},
)
async def async_step_user(self, user_input=None):
"""Handle a flow initiated by the user."""
errors = {}
icloud_dir = self.hass.helpers.storage.Store(STORAGE_VERSION, STORAGE_KEY)
if not os.path.exists(icloud_dir.path):
await self.hass.async_add_executor_job(os.makedirs, icloud_dir.path)
if user_input is None:
return await self._show_setup_form(user_input, errors)
self._username = user_input[CONF_USERNAME]
self._password = user_input[CONF_PASSWORD]
self._account_name = user_input.get(CONF_ACCOUNT_NAME)
self._max_interval = user_input.get(CONF_MAX_INTERVAL, DEFAULT_MAX_INTERVAL)
self._gps_accuracy_threshold = user_input.get(
CONF_GPS_ACCURACY_THRESHOLD, DEFAULT_GPS_ACCURACY_THRESHOLD
)
if self._configuration_exists(self._username, self._account_name):
errors[CONF_USERNAME] = "username_exists"
return await self._show_setup_form(user_input, errors)
try:
self.api = await self.hass.async_add_executor_job(
PyiCloudService, self._username, self._password, icloud_dir.path
)
except PyiCloudFailedLoginException as error:
_LOGGER.error("Error logging into iCloud service: %s", error)
self.api = None
errors[CONF_USERNAME] = "login"
return await self._show_setup_form(user_input, errors)
if self.api.requires_2fa:
return await self.async_step_trusted_device()
return self.async_create_entry(
title=self._username,
data={
CONF_USERNAME: self._username,
CONF_PASSWORD: self._password,
CONF_ACCOUNT_NAME: self._account_name,
CONF_MAX_INTERVAL: self._max_interval,
CONF_GPS_ACCURACY_THRESHOLD: self._gps_accuracy_threshold,
},
)
async def async_step_import(self, user_input):
"""Import a config entry."""
if self._configuration_exists(
user_input[CONF_USERNAME], user_input.get(CONF_ACCOUNT_NAME)
):
return self.async_abort(reason="username_exists")
return await self.async_step_user(user_input)
async def async_step_trusted_device(self, user_input=None, errors=None):
"""We need a trusted device."""
if errors is None:
errors = {}
trusted_devices = await self.hass.async_add_executor_job(
getattr, self.api, "trusted_devices"
)
trusted_devices_for_form = {}
for i, device in enumerate(trusted_devices):
trusted_devices_for_form[i] = device.get(
"deviceName", f"SMS to {device.get('phoneNumber')}"
)
if user_input is None:
return await self._show_trusted_device_form(
trusted_devices_for_form, user_input, errors
)
self._trusted_device = trusted_devices[int(user_input[CONF_TRUSTED_DEVICE])]
if not await self.hass.async_add_executor_job(
self.api.send_verification_code, self._trusted_device
):
_LOGGER.error("Failed to send verification code")
self._trusted_device = None
errors[CONF_TRUSTED_DEVICE] = "send_verification_code"
return await self._show_trusted_device_form(
trusted_devices_for_form, user_input, errors
)
return await self.async_step_verification_code()
async def _show_trusted_device_form(
self, trusted_devices, user_input=None, errors=None
):
"""Show the trusted_device form to the user."""
return self.async_show_form(
step_id=CONF_TRUSTED_DEVICE,
data_schema=vol.Schema(
{
vol.Required(CONF_TRUSTED_DEVICE): vol.All(
vol.Coerce(int), vol.In(trusted_devices)
)
}
),
errors=errors or {},
)
async def async_step_verification_code(self, user_input=None):
"""Ask the verification code to the user."""
errors = {}
if user_input is None:
return await self._show_verification_code_form(user_input)
self._verification_code = user_input[CONF_VERIFICATION_CODE]
try:
if not await self.hass.async_add_executor_job(
self.api.validate_verification_code,
self._trusted_device,
self._verification_code,
):
raise PyiCloudException("The code you entered is not valid.")
except PyiCloudException as error:
# Reset to the initial 2FA state to allow the user to retry
_LOGGER.error("Failed to verify verification code: %s", error)
self._trusted_device = None
self._verification_code = None
errors["base"] = "validate_verification_code"
return await self.async_step_trusted_device(None, errors)
return await self.async_step_user(
{
CONF_USERNAME: self._username,
CONF_PASSWORD: self._password,
CONF_ACCOUNT_NAME: self._account_name,
CONF_MAX_INTERVAL: self._max_interval,
CONF_GPS_ACCURACY_THRESHOLD: self._gps_accuracy_threshold,
}
)
async def _show_verification_code_form(self, user_input=None):
"""Show the verification_code form to the user."""
return self.async_show_form(
step_id=CONF_VERIFICATION_CODE,
data_schema=vol.Schema({vol.Required(CONF_VERIFICATION_CODE): str}),
errors=None,
)

View File

@ -1,6 +1,85 @@
"""Constants for the iCloud component."""
"""iCloud component constants."""
DOMAIN = "icloud"
SERVICE_LOST_IPHONE = "lost_iphone"
SERVICE_UPDATE = "update"
SERVICE_RESET_ACCOUNT = "reset_account"
SERVICE_SET_INTERVAL = "set_interval"
TRACKER_UPDATE = f"{DOMAIN}_tracker_update"
CONF_ACCOUNT_NAME = "account_name"
CONF_MAX_INTERVAL = "max_interval"
CONF_GPS_ACCURACY_THRESHOLD = "gps_accuracy_threshold"
DEFAULT_MAX_INTERVAL = 30 # min
DEFAULT_GPS_ACCURACY_THRESHOLD = 500 # meters
# to store the cookie
STORAGE_KEY = DOMAIN
STORAGE_VERSION = 1
# Next PR will add sensor
ICLOUD_COMPONENTS = ["device_tracker"]
# pyicloud.AppleDevice status
DEVICE_BATTERY_LEVEL = "batteryLevel"
DEVICE_BATTERY_STATUS = "batteryStatus"
DEVICE_CLASS = "deviceClass"
DEVICE_DISPLAY_NAME = "deviceDisplayName"
DEVICE_ID = "id"
DEVICE_LOCATION = "location"
DEVICE_LOCATION_HORIZONTAL_ACCURACY = "horizontalAccuracy"
DEVICE_LOCATION_LATITUDE = "latitude"
DEVICE_LOCATION_LONGITUDE = "longitude"
DEVICE_LOST_MODE_CAPABLE = "lostModeCapable"
DEVICE_LOW_POWER_MODE = "lowPowerMode"
DEVICE_NAME = "name"
DEVICE_PERSON_ID = "prsId"
DEVICE_RAW_DEVICE_MODEL = "rawDeviceModel"
DEVICE_STATUS = "deviceStatus"
DEVICE_STATUS_SET = [
"features",
"maxMsgChar",
"darkWake",
"fmlyShare",
DEVICE_STATUS,
"remoteLock",
"activationLocked",
DEVICE_CLASS,
DEVICE_ID,
"deviceModel",
DEVICE_RAW_DEVICE_MODEL,
"passcodeLength",
"canWipeAfterLock",
"trackingInfo",
DEVICE_LOCATION,
"msg",
DEVICE_BATTERY_LEVEL,
"remoteWipe",
"thisDevice",
"snd",
DEVICE_PERSON_ID,
"wipeInProgress",
DEVICE_LOW_POWER_MODE,
"lostModeEnabled",
"isLocating",
DEVICE_LOST_MODE_CAPABLE,
"mesg",
DEVICE_NAME,
DEVICE_BATTERY_STATUS,
"lockedTimestamp",
"lostTimestamp",
"locationCapable",
DEVICE_DISPLAY_NAME,
"lostDevice",
"deviceColor",
"wipedTimestamp",
"modelDisplayName",
"locationEnabled",
"isMac",
"locFoundEnabled",
]
DEVICE_STATUS_CODES = {
"200": "online",
"201": "offline",
"203": "pending",
"204": "unregistered",
}

View File

@ -1,544 +1,132 @@
"""Platform that supports scanning iCloud."""
"""Support for tracking for iCloud devices."""
import logging
import os
import random
from pyicloud import PyiCloudService
from pyicloud.exceptions import (
PyiCloudException,
PyiCloudFailedLoginException,
PyiCloudNoDevicesException,
)
import voluptuous as vol
from homeassistant.components.device_tracker import PLATFORM_SCHEMA
from homeassistant.components.device_tracker.const import (
ATTR_ATTRIBUTES,
ENTITY_ID_FORMAT,
)
from homeassistant.components.device_tracker.legacy import DeviceScanner
from homeassistant.components.zone import async_active_zone
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.event import track_utc_time_change
from homeassistant.util import slugify
from homeassistant.util.async_ import run_callback_threadsafe
import homeassistant.util.dt as dt_util
from homeassistant.util.location import distance
from homeassistant.components.device_tracker import SOURCE_TYPE_GPS
from homeassistant.components.device_tracker.config_entry import TrackerEntity
from homeassistant.const import CONF_USERNAME
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.typing import HomeAssistantType
from . import IcloudDevice
from .const import (
DEVICE_LOCATION_HORIZONTAL_ACCURACY,
DEVICE_LOCATION_LATITUDE,
DEVICE_LOCATION_LONGITUDE,
DOMAIN,
SERVICE_LOST_IPHONE,
SERVICE_RESET_ACCOUNT,
SERVICE_SET_INTERVAL,
SERVICE_UPDATE,
TRACKER_UPDATE,
)
_LOGGER = logging.getLogger(__name__)
CONF_ACCOUNTNAME = "account_name"
CONF_MAX_INTERVAL = "max_interval"
CONF_GPS_ACCURACY_THRESHOLD = "gps_accuracy_threshold"
# entity attributes
ATTR_ACCOUNTNAME = "account_name"
ATTR_INTERVAL = "interval"
ATTR_DEVICENAME = "device_name"
ATTR_BATTERY = "battery"
ATTR_DISTANCE = "distance"
ATTR_DEVICESTATUS = "device_status"
ATTR_LOWPOWERMODE = "low_power_mode"
ATTR_BATTERYSTATUS = "battery_status"
ICLOUDTRACKERS = {}
_CONFIGURING = {}
DEVICESTATUSSET = [
"features",
"maxMsgChar",
"darkWake",
"fmlyShare",
"deviceStatus",
"remoteLock",
"activationLocked",
"deviceClass",
"id",
"deviceModel",
"rawDeviceModel",
"passcodeLength",
"canWipeAfterLock",
"trackingInfo",
"location",
"msg",
"batteryLevel",
"remoteWipe",
"thisDevice",
"snd",
"prsId",
"wipeInProgress",
"lowPowerMode",
"lostModeEnabled",
"isLocating",
"lostModeCapable",
"mesg",
"name",
"batteryStatus",
"lockedTimestamp",
"lostTimestamp",
"locationCapable",
"deviceDisplayName",
"lostDevice",
"deviceColor",
"wipedTimestamp",
"modelDisplayName",
"locationEnabled",
"isMac",
"locFoundEnabled",
]
DEVICESTATUSCODES = {
"200": "online",
"201": "offline",
"203": "pending",
"204": "unregistered",
}
SERVICE_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_ACCOUNTNAME): vol.All(cv.ensure_list, [cv.slugify]),
vol.Optional(ATTR_DEVICENAME): cv.slugify,
vol.Optional(ATTR_INTERVAL): cv.positive_int,
}
)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(ATTR_ACCOUNTNAME): cv.slugify,
vol.Optional(CONF_MAX_INTERVAL, default=30): cv.positive_int,
vol.Optional(CONF_GPS_ACCURACY_THRESHOLD, default=1000): cv.positive_int,
}
)
async def async_setup_scanner(
hass: HomeAssistantType, config, see, discovery_info=None
):
"""Old way of setting up the iCloud tracker."""
pass
def setup_scanner(hass, config: dict, see, discovery_info=None):
"""Set up the iCloud Scanner."""
username = config.get(CONF_USERNAME)
password = config.get(CONF_PASSWORD)
account = config.get(CONF_ACCOUNTNAME, slugify(username.partition("@")[0]))
max_interval = config.get(CONF_MAX_INTERVAL)
gps_accuracy_threshold = config.get(CONF_GPS_ACCURACY_THRESHOLD)
async def async_setup_entry(hass: HomeAssistantType, entry, async_add_entities):
"""Configure a dispatcher connection based on a config entry."""
username = entry.data[CONF_USERNAME]
icloudaccount = Icloud(
hass, username, password, account, max_interval, gps_accuracy_threshold, see
)
for device in hass.data[DOMAIN][username].devices.values():
if device.location is None:
_LOGGER.debug("No position found for device %s", device.name)
continue
if icloudaccount.api is not None:
ICLOUDTRACKERS[account] = icloudaccount
_LOGGER.debug("Adding device_tracker for %s", device.name)
else:
_LOGGER.error("No ICLOUDTRACKERS added")
async_add_entities([IcloudTrackerEntity(device)])
class IcloudTrackerEntity(TrackerEntity):
"""Represent a tracked device."""
def __init__(self, device: IcloudDevice):
"""Set up the iCloud tracker entity."""
self._device = device
self._unsub_dispatcher = None
@property
def unique_id(self):
"""Return a unique ID."""
return f"{self._device.unique_id}_tracker"
@property
def name(self):
"""Return the name of the device."""
return self._device.name
@property
def location_accuracy(self):
"""Return the location accuracy of the device."""
return self._device.location[DEVICE_LOCATION_HORIZONTAL_ACCURACY]
@property
def latitude(self):
"""Return latitude value of the device."""
return self._device.location[DEVICE_LOCATION_LATITUDE]
@property
def longitude(self):
"""Return longitude value of the device."""
return self._device.location[DEVICE_LOCATION_LONGITUDE]
@property
def should_poll(self):
"""No polling needed."""
return False
def lost_iphone(call):
"""Call the lost iPhone function if the device is found."""
accounts = call.data.get(ATTR_ACCOUNTNAME, ICLOUDTRACKERS)
devicename = call.data.get(ATTR_DEVICENAME)
for account in accounts:
if account in ICLOUDTRACKERS:
ICLOUDTRACKERS[account].lost_iphone(devicename)
@property
def battery_level(self):
"""Return the battery level of the device."""
return self._device.battery_level
hass.services.register(
DOMAIN, SERVICE_LOST_IPHONE, lost_iphone, schema=SERVICE_SCHEMA
)
@property
def source_type(self):
"""Return the source type, eg gps or router, of the device."""
return SOURCE_TYPE_GPS
def update_icloud(call):
"""Call the update function of an iCloud account."""
accounts = call.data.get(ATTR_ACCOUNTNAME, ICLOUDTRACKERS)
devicename = call.data.get(ATTR_DEVICENAME)
for account in accounts:
if account in ICLOUDTRACKERS:
ICLOUDTRACKERS[account].update_icloud(devicename)
@property
def icon(self):
"""Return the icon."""
return icon_for_icloud_device(self._device)
hass.services.register(DOMAIN, SERVICE_UPDATE, update_icloud, schema=SERVICE_SCHEMA)
@property
def device_state_attributes(self):
"""Return the device state attributes."""
return self._device.state_attributes
def reset_account_icloud(call):
"""Reset an iCloud account."""
accounts = call.data.get(ATTR_ACCOUNTNAME, ICLOUDTRACKERS)
for account in accounts:
if account in ICLOUDTRACKERS:
ICLOUDTRACKERS[account].reset_account_icloud()
@property
def device_info(self):
"""Return the device information."""
return {
"identifiers": {(DOMAIN, self.unique_id)},
"name": self.name,
"manufacturer": "Apple",
"model": self._device.device_model,
}
hass.services.register(
DOMAIN, SERVICE_RESET_ACCOUNT, reset_account_icloud, schema=SERVICE_SCHEMA
)
def setinterval(call):
"""Call the update function of an iCloud account."""
accounts = call.data.get(ATTR_ACCOUNTNAME, ICLOUDTRACKERS)
interval = call.data.get(ATTR_INTERVAL)
devicename = call.data.get(ATTR_DEVICENAME)
for account in accounts:
if account in ICLOUDTRACKERS:
ICLOUDTRACKERS[account].setinterval(interval, devicename)
hass.services.register(
DOMAIN, SERVICE_SET_INTERVAL, setinterval, schema=SERVICE_SCHEMA
)
# Tells the bootstrapper that the component was successfully initialized
return True
class Icloud(DeviceScanner):
"""Representation of an iCloud account."""
def __init__(
self, hass, username, password, name, max_interval, gps_accuracy_threshold, see
):
"""Initialize an iCloud account."""
self.hass = hass
self.username = username
self.password = password
self.api = None
self.accountname = name
self.devices = {}
self.seen_devices = {}
self._overridestates = {}
self._intervals = {}
self._max_interval = max_interval
self._gps_accuracy_threshold = gps_accuracy_threshold
self.see = see
self._trusted_device = None
self._verification_code = None
self._attrs = {}
self._attrs[ATTR_ACCOUNTNAME] = name
self.reset_account_icloud()
randomseconds = random.randint(10, 59)
track_utc_time_change(self.hass, self.keep_alive, second=randomseconds)
def reset_account_icloud(self):
"""Reset an iCloud account."""
icloud_dir = self.hass.config.path("icloud")
if not os.path.exists(icloud_dir):
os.makedirs(icloud_dir)
try:
self.api = PyiCloudService(
self.username, self.password, cookie_directory=icloud_dir, verify=True
)
except PyiCloudFailedLoginException as error:
self.api = None
_LOGGER.error("Error logging into iCloud Service: %s", error)
return
try:
self.devices = {}
self._overridestates = {}
self._intervals = {}
for device in self.api.devices:
status = device.status(DEVICESTATUSSET)
_LOGGER.debug("Device Status is %s", status)
devicename = slugify(status["name"].replace(" ", "", 99))
_LOGGER.info("Adding icloud device: %s", devicename)
if devicename in self.devices:
_LOGGER.error("Multiple devices with name: %s", devicename)
continue
self.devices[devicename] = device
self._intervals[devicename] = 1
self._overridestates[devicename] = None
except PyiCloudNoDevicesException:
_LOGGER.error("No iCloud Devices found!")
def icloud_trusted_device_callback(self, callback_data):
"""Handle chosen trusted devices."""
self._trusted_device = int(callback_data.get("trusted_device"))
self._trusted_device = self.api.trusted_devices[self._trusted_device]
if not self.api.send_verification_code(self._trusted_device):
_LOGGER.error("Failed to send verification code")
self._trusted_device = None
return
if self.accountname in _CONFIGURING:
request_id = _CONFIGURING.pop(self.accountname)
configurator = self.hass.components.configurator
configurator.request_done(request_id)
# Trigger the next step immediately
self.icloud_need_verification_code()
def icloud_need_trusted_device(self):
"""We need a trusted device."""
configurator = self.hass.components.configurator
if self.accountname in _CONFIGURING:
return
devicesstring = ""
devices = self.api.trusted_devices
for i, device in enumerate(devices):
devicename = device.get(
"deviceName", "SMS to %s" % device.get("phoneNumber")
)
devicesstring += f"{i}: {devicename};"
_CONFIGURING[self.accountname] = configurator.request_config(
f"iCloud {self.accountname}",
self.icloud_trusted_device_callback,
description=(
"Please choose your trusted device by entering"
" the index from this list: " + devicesstring
),
entity_picture="/static/images/config_icloud.png",
submit_caption="Confirm",
fields=[{"id": "trusted_device", "name": "Trusted Device"}],
async def async_added_to_hass(self):
"""Register state update callback."""
self._unsub_dispatcher = async_dispatcher_connect(
self.hass, TRACKER_UPDATE, self.async_write_ha_state
)
def icloud_verification_callback(self, callback_data):
"""Handle the chosen trusted device."""
self._verification_code = callback_data.get("code")
async def async_will_remove_from_hass(self):
"""Clean up after entity before removal."""
self._unsub_dispatcher()
try:
if not self.api.validate_verification_code(
self._trusted_device, self._verification_code
):
raise PyiCloudException("Unknown failure")
except PyiCloudException as error:
# Reset to the initial 2FA state to allow the user to retry
_LOGGER.error("Failed to verify verification code: %s", error)
self._trusted_device = None
self._verification_code = None
# Trigger the next step immediately
self.icloud_need_trusted_device()
def icon_for_icloud_device(icloud_device: IcloudDevice) -> str:
"""Return a battery icon valid identifier."""
switcher = {
"iPad": "mdi:tablet-ipad",
"iPhone": "mdi:cellphone-iphone",
"iPod": "mdi:ipod",
"iMac": "mdi:desktop-mac",
"MacBookPro": "mdi:laptop-mac",
}
if self.accountname in _CONFIGURING:
request_id = _CONFIGURING.pop(self.accountname)
configurator = self.hass.components.configurator
configurator.request_done(request_id)
def icloud_need_verification_code(self):
"""Return the verification code."""
configurator = self.hass.components.configurator
if self.accountname in _CONFIGURING:
return
_CONFIGURING[self.accountname] = configurator.request_config(
f"iCloud {self.accountname}",
self.icloud_verification_callback,
description=("Please enter the validation code:"),
entity_picture="/static/images/config_icloud.png",
submit_caption="Confirm",
fields=[{"id": "code", "name": "code"}],
)
def keep_alive(self, now):
"""Keep the API alive."""
if self.api is None:
self.reset_account_icloud()
if self.api is None:
return
if self.api.requires_2fa:
try:
if self._trusted_device is None:
self.icloud_need_trusted_device()
return
if self._verification_code is None:
self.icloud_need_verification_code()
return
self.api.authenticate()
if self.api.requires_2fa:
raise Exception("Unknown failure")
self._trusted_device = None
self._verification_code = None
except PyiCloudException as error:
_LOGGER.error("Error setting up 2FA: %s", error)
else:
self.api.authenticate()
currentminutes = dt_util.now().hour * 60 + dt_util.now().minute
try:
for devicename in self.devices:
interval = self._intervals.get(devicename, 1)
if (currentminutes % interval == 0) or (
interval > 10 and currentminutes % interval in [2, 4]
):
self.update_device(devicename)
except ValueError:
_LOGGER.debug("iCloud API returned an error")
def determine_interval(self, devicename, latitude, longitude, battery):
"""Calculate new interval."""
currentzone = run_callback_threadsafe(
self.hass.loop, async_active_zone, self.hass, latitude, longitude
).result()
if (
currentzone is not None
and currentzone == self._overridestates.get(devicename)
) or (currentzone is None and self._overridestates.get(devicename) == "away"):
return
zones = (
self.hass.states.get(entity_id)
for entity_id in sorted(self.hass.states.entity_ids("zone"))
)
distances = []
for zone_state in zones:
zone_state_lat = zone_state.attributes["latitude"]
zone_state_long = zone_state.attributes["longitude"]
zone_distance = distance(
latitude, longitude, zone_state_lat, zone_state_long
)
distances.append(round(zone_distance / 1000, 1))
if distances:
mindistance = min(distances)
else:
mindistance = None
self._overridestates[devicename] = None
if currentzone is not None:
self._intervals[devicename] = self._max_interval
return
if mindistance is None:
return
# Calculate out how long it would take for the device to drive to the
# nearest zone at 120 km/h:
interval = round(mindistance / 2, 0)
# Never poll more than once per minute
interval = max(interval, 1)
if interval > 180:
# Three hour drive? This is far enough that they might be flying
interval = 30
if battery is not None and battery <= 33 and mindistance > 3:
# Low battery - let's check half as often
interval = interval * 2
self._intervals[devicename] = interval
def update_device(self, devicename):
"""Update the device_tracker entity."""
# An entity will not be created by see() when track=false in
# 'known_devices.yaml', but we need to see() it at least once
entity = self.hass.states.get(ENTITY_ID_FORMAT.format(devicename))
if entity is None and devicename in self.seen_devices:
return
attrs = {}
kwargs = {}
if self.api is None:
return
try:
for device in self.api.devices:
if str(device) != str(self.devices[devicename]):
continue
status = device.status(DEVICESTATUSSET)
_LOGGER.debug("Device Status is %s", status)
dev_id = status["name"].replace(" ", "", 99)
dev_id = slugify(dev_id)
attrs[ATTR_DEVICESTATUS] = DEVICESTATUSCODES.get(
status["deviceStatus"], "error"
)
attrs[ATTR_LOWPOWERMODE] = status["lowPowerMode"]
attrs[ATTR_BATTERYSTATUS] = status["batteryStatus"]
attrs[ATTR_ACCOUNTNAME] = self.accountname
status = device.status(DEVICESTATUSSET)
battery = status.get("batteryLevel", 0) * 100
location = status["location"]
if location and location["horizontalAccuracy"]:
horizontal_accuracy = int(location["horizontalAccuracy"])
if horizontal_accuracy < self._gps_accuracy_threshold:
self.determine_interval(
devicename,
location["latitude"],
location["longitude"],
battery,
)
interval = self._intervals.get(devicename, 1)
attrs[ATTR_INTERVAL] = interval
accuracy = location["horizontalAccuracy"]
kwargs["dev_id"] = dev_id
kwargs["host_name"] = status["name"]
kwargs["gps"] = (location["latitude"], location["longitude"])
kwargs["battery"] = battery
kwargs["gps_accuracy"] = accuracy
kwargs[ATTR_ATTRIBUTES] = attrs
self.see(**kwargs)
self.seen_devices[devicename] = True
except PyiCloudNoDevicesException:
_LOGGER.error("No iCloud Devices found")
def lost_iphone(self, devicename):
"""Call the lost iPhone function if the device is found."""
if self.api is None:
return
self.api.authenticate()
for device in self.api.devices:
if str(device) == str(self.devices[devicename]):
_LOGGER.info("Playing Lost iPhone sound for %s", devicename)
device.play_sound()
def update_icloud(self, devicename=None):
"""Request device information from iCloud and update device_tracker."""
if self.api is None:
return
try:
if devicename is not None:
if devicename in self.devices:
self.update_device(devicename)
else:
_LOGGER.error(
"devicename %s unknown for account %s",
devicename,
self._attrs[ATTR_ACCOUNTNAME],
)
else:
for device in self.devices:
self.update_device(device)
except PyiCloudNoDevicesException:
_LOGGER.error("No iCloud Devices found")
def setinterval(self, interval=None, devicename=None):
"""Set the interval of the given devices."""
devs = [devicename] if devicename else self.devices
for device in devs:
devid = f"{DOMAIN}.{device}"
devicestate = self.hass.states.get(devid)
if interval is not None:
if devicestate is not None:
self._overridestates[device] = run_callback_threadsafe(
self.hass.loop,
async_active_zone,
self.hass,
float(devicestate.attributes.get("latitude", 0)),
float(devicestate.attributes.get("longitude", 0)),
).result()
if self._overridestates[device] is None:
self._overridestates[device] = "away"
self._intervals[device] = interval
else:
self._overridestates[device] = None
self.update_device(device)
return switcher.get(icloud_device.device_class, "mdi:cellphone-link")

View File

@ -1,10 +1,13 @@
{
"domain": "icloud",
"name": "Icloud",
"documentation": "https://www.home-assistant.io/integrations/icloud",
"name": "iCloud",
"config_flow": true,
"documentation": "https://www.home-assistant.io/components/icloud",
"requirements": [
"pyicloud==0.9.1"
],
"dependencies": ["configurator"],
"codeowners": []
}
"dependencies": [],
"codeowners": [
"@Quentame"
]
}

View File

@ -1,39 +1,49 @@
lost_iphone:
description: Service to play the lost iphone sound on an iDevice.
fields:
account_name:
description: Name of the account in the config that will be used to look for the device. This is optional, if it isn't given it will use all accounts.
example: 'bart'
device_name:
description: Name of the device that will play the sound. This is optional, if it isn't given it will play on all devices for the given account.
example: 'iphonebart'
set_interval:
description: Service to set the interval of an iDevice.
fields:
account_name:
description: Name of the account in the config that will be used to look for the device. This is optional, if it isn't given it will use all accounts.
example: 'bart'
device_name:
description: Name of the device that will get a new interval. This is optional, if it isn't given it will change the interval for all devices for the given account.
example: 'iphonebart'
interval:
description: The interval (in minutes) that the iDevice will have until the according device_tracker entity changes from zone or until this service is used again. This is optional, if it isn't given the interval of the device will revert back to the original interval based on the current state.
example: 1
update:
description: Service to ask for an update of an iDevice.
description: Update iCloud devices.
fields:
account_name:
description: Name of the account in the config that will be used to look for the device. This is optional, if it isn't given it will use all accounts.
example: 'bart'
device_name:
description: Name of the device that will be updated. This is optional, if it isn't given it will update all devices for the given account.
example: 'iphonebart'
account:
description: Your iCloud account username (email) or account name.
example: 'steve@apple.com'
reset_account:
description: Service to restart an iCloud account. Helpful when not all devices are found after initializing or when you add a new device.
play_sound:
description: Play sound on an Apple device.
fields:
account_name:
description: Name of the account in the config that will be restarted. This is optional, if it isn't given it will restart all accounts.
example: 'bart'
account:
description: (required) Your iCloud account username (email) or account name.
example: 'steve@apple.com'
device_name:
description: (required) The name of the Apple device to play a sound.
example: 'stevesiphone'
display_message:
description: Display a message on an Apple device.
fields:
account:
description: (required) Your iCloud account username (email) or account name.
example: 'steve@apple.com'
device_name:
description: (required) The name of the Apple device to display the message.
example: 'stevesiphone'
message:
description: (required) The content of your message.
example: 'Hey Steve !'
sound:
description: To make a sound when displaying the message (boolean).
example: 'true'
lost_device:
description: Make an Apple device in lost state.
fields:
account:
description: (required) Your iCloud account username (email) or account name.
example: 'steve@apple.com'
device_name:
description: (required) The name of the Apple device to set lost.
example: 'stevesiphone'
number:
description: (required) The phone number to call in lost mode (must contain country code).
example: '+33450020100'
message:
description: (required) The message to display in lost mode.
example: 'Call me'

View File

@ -0,0 +1,38 @@
{
"config": {
"title": "Apple iCloud",
"step": {
"user": {
"title": "iCloud credentials",
"description": "Enter your credentials",
"data": {
"username": "Email",
"password": "Password"
}
},
"trusted_device": {
"title": "iCloud trusted device",
"description": "Select your trusted device",
"data": {
"trusted_device": "Trusted device"
}
},
"verification_code": {
"title": "iCloud verification code",
"description": "Please enter the verification code you just received from iCloud",
"data": {
"verification_code": "Verification code"
}
}
},
"error":{
"username_exists": "Account already configured",
"login": "Login error: please check your email & password",
"send_verification_code": "Failed to send verification code",
"validate_verification_code": "Failed to verify your verification code, choose a trust device and start the verification again",
},
"abort":{
"username_exists": "Account already configured"
}
}
}

View File

@ -36,6 +36,7 @@ FLOWS = [
"huawei_lte",
"hue",
"iaqualink",
"icloud",
"ifttt",
"ios",
"ipma",

View File

@ -432,6 +432,9 @@ pyheos==0.6.0
# homeassistant.components.homematic
pyhomematic==0.1.62
# homeassistant.components.icloud
pyicloud==0.9.1
# homeassistant.components.ipma
pyipma==1.2.1

View File

@ -0,0 +1 @@
"""Tests for the iCloud component."""

View File

@ -0,0 +1,309 @@
"""Tests for the iCloud config flow."""
from unittest.mock import patch, Mock, MagicMock
import pytest
from pyicloud.exceptions import PyiCloudFailedLoginException
from homeassistant import data_entry_flow
from homeassistant.components.icloud import config_flow
from homeassistant.components.icloud.config_flow import (
CONF_TRUSTED_DEVICE,
CONF_VERIFICATION_CODE,
)
from homeassistant.components.icloud.const import (
DOMAIN,
CONF_ACCOUNT_NAME,
CONF_GPS_ACCURACY_THRESHOLD,
CONF_MAX_INTERVAL,
DEFAULT_GPS_ACCURACY_THRESHOLD,
DEFAULT_MAX_INTERVAL,
)
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from homeassistant.helpers.typing import HomeAssistantType
from tests.common import MockConfigEntry
USERNAME = "username@me.com"
PASSWORD = "password"
ACCOUNT_NAME = "Account name 1 2 3"
ACCOUNT_NAME_FROM_USERNAME = None
MAX_INTERVAL = 15
GPS_ACCURACY_THRESHOLD = 250
TRUSTED_DEVICES = [
{"deviceType": "SMS", "areaCode": "", "phoneNumber": "*******58", "deviceId": "1"}
]
@pytest.fixture(name="service")
def mock_controller_service():
"""Mock a successful service."""
with patch(
"homeassistant.components.icloud.config_flow.PyiCloudService"
) as service_mock:
service_mock.return_value.requires_2fa = True
yield service_mock
@pytest.fixture(name="service_with_cookie")
def mock_controller_service_with_cookie():
"""Mock a successful service while already authenticate."""
with patch(
"homeassistant.components.icloud.config_flow.PyiCloudService"
) as service_mock:
service_mock.return_value.requires_2fa = False
service_mock.return_value.trusted_devices = TRUSTED_DEVICES
service_mock.return_value.send_verification_code = Mock(return_value=True)
service_mock.return_value.validate_verification_code = Mock(return_value=True)
yield service_mock
@pytest.fixture(name="service_send_verification_code_failed")
def mock_controller_service_send_verification_code_failed():
"""Mock a failed service during sending verification code step."""
with patch(
"homeassistant.components.icloud.config_flow.PyiCloudService"
) as service_mock:
service_mock.return_value.requires_2fa = False
service_mock.return_value.trusted_devices = TRUSTED_DEVICES
service_mock.return_value.send_verification_code = Mock(return_value=False)
yield service_mock
@pytest.fixture(name="service_validate_verification_code_failed")
def mock_controller_service_validate_verification_code_failed():
"""Mock a failed service during validation of verification code step."""
with patch(
"homeassistant.components.icloud.config_flow.PyiCloudService"
) as service_mock:
service_mock.return_value.requires_2fa = False
service_mock.return_value.trusted_devices = TRUSTED_DEVICES
service_mock.return_value.send_verification_code = Mock(return_value=True)
service_mock.return_value.validate_verification_code = Mock(return_value=False)
yield service_mock
def init_config_flow(hass: HomeAssistantType):
"""Init a configuration flow."""
flow = config_flow.IcloudFlowHandler()
flow.hass = hass
return flow
async def test_user(hass: HomeAssistantType, service: MagicMock):
"""Test user config."""
flow = init_config_flow(hass)
result = await flow.async_step_user()
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "user"
# test with all provided
result = await flow.async_step_user(
{CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == CONF_TRUSTED_DEVICE
async def test_user_with_cookie(
hass: HomeAssistantType, service_with_cookie: MagicMock
):
"""Test user config with presence of a cookie."""
flow = init_config_flow(hass)
# test with all provided
result = await flow.async_step_user(
{CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == USERNAME
assert result["data"][CONF_USERNAME] == USERNAME
assert result["data"][CONF_PASSWORD] == PASSWORD
assert result["data"][CONF_ACCOUNT_NAME] == ACCOUNT_NAME_FROM_USERNAME
assert result["data"][CONF_MAX_INTERVAL] == DEFAULT_MAX_INTERVAL
assert result["data"][CONF_GPS_ACCURACY_THRESHOLD] == DEFAULT_GPS_ACCURACY_THRESHOLD
async def test_import(hass: HomeAssistantType, service: MagicMock):
"""Test import step."""
flow = init_config_flow(hass)
# import with username and password
result = await flow.async_step_import(
{CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "trusted_device"
# import with all
result = await flow.async_step_import(
{
CONF_USERNAME: USERNAME,
CONF_PASSWORD: PASSWORD,
CONF_ACCOUNT_NAME: ACCOUNT_NAME,
CONF_MAX_INTERVAL: MAX_INTERVAL,
CONF_GPS_ACCURACY_THRESHOLD: GPS_ACCURACY_THRESHOLD,
}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "trusted_device"
async def test_import_with_cookie(
hass: HomeAssistantType, service_with_cookie: MagicMock
):
"""Test import step with presence of a cookie."""
flow = init_config_flow(hass)
# import with username and password
result = await flow.async_step_import(
{CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == USERNAME
assert result["data"][CONF_USERNAME] == USERNAME
assert result["data"][CONF_PASSWORD] == PASSWORD
assert result["data"][CONF_ACCOUNT_NAME] == ACCOUNT_NAME_FROM_USERNAME
assert result["data"][CONF_MAX_INTERVAL] == DEFAULT_MAX_INTERVAL
assert result["data"][CONF_GPS_ACCURACY_THRESHOLD] == DEFAULT_GPS_ACCURACY_THRESHOLD
# import with all
result = await flow.async_step_import(
{
CONF_USERNAME: USERNAME,
CONF_PASSWORD: PASSWORD,
CONF_ACCOUNT_NAME: ACCOUNT_NAME,
CONF_MAX_INTERVAL: MAX_INTERVAL,
CONF_GPS_ACCURACY_THRESHOLD: GPS_ACCURACY_THRESHOLD,
}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == USERNAME
assert result["data"][CONF_USERNAME] == USERNAME
assert result["data"][CONF_PASSWORD] == PASSWORD
assert result["data"][CONF_ACCOUNT_NAME] == ACCOUNT_NAME
assert result["data"][CONF_MAX_INTERVAL] == MAX_INTERVAL
assert result["data"][CONF_GPS_ACCURACY_THRESHOLD] == GPS_ACCURACY_THRESHOLD
async def test_abort_if_already_setup(hass: HomeAssistantType):
"""Test we abort if the account is already setup."""
flow = init_config_flow(hass)
MockConfigEntry(
domain=DOMAIN, data={CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}
).add_to_hass(hass)
# Should fail, same USERNAME (import)
result = await flow.async_step_import(
{CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "username_exists"
# Should fail, same ACCOUNT_NAME (import)
result = await flow.async_step_import(
{
CONF_USERNAME: "other_username@icloud.com",
CONF_PASSWORD: PASSWORD,
CONF_ACCOUNT_NAME: ACCOUNT_NAME_FROM_USERNAME,
}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "username_exists"
# Should fail, same USERNAME (flow)
result = await flow.async_step_user(
{CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {CONF_USERNAME: "username_exists"}
async def test_login_failed(hass: HomeAssistantType):
"""Test when we have errors during login."""
flow = init_config_flow(hass)
with patch(
"pyicloud.base.PyiCloudService.authenticate",
side_effect=PyiCloudFailedLoginException(),
):
result = await flow.async_step_user(
{CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {CONF_USERNAME: "login"}
async def test_trusted_device(hass: HomeAssistantType, service: MagicMock):
"""Test trusted_device step."""
flow = init_config_flow(hass)
await flow.async_step_user({CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD})
result = await flow.async_step_trusted_device()
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == CONF_TRUSTED_DEVICE
async def test_trusted_device_success(hass: HomeAssistantType, service: MagicMock):
"""Test trusted_device step success."""
flow = init_config_flow(hass)
await flow.async_step_user({CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD})
result = await flow.async_step_trusted_device({CONF_TRUSTED_DEVICE: 0})
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == CONF_VERIFICATION_CODE
async def test_send_verification_code_failed(
hass: HomeAssistantType, service_send_verification_code_failed: MagicMock
):
"""Test when we have errors during send_verification_code."""
flow = init_config_flow(hass)
await flow.async_step_user({CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD})
result = await flow.async_step_trusted_device({CONF_TRUSTED_DEVICE: 0})
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == CONF_TRUSTED_DEVICE
assert result["errors"] == {CONF_TRUSTED_DEVICE: "send_verification_code"}
async def test_verification_code(hass: HomeAssistantType):
"""Test verification_code step."""
flow = init_config_flow(hass)
await flow.async_step_user({CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD})
result = await flow.async_step_verification_code()
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == CONF_VERIFICATION_CODE
async def test_verification_code_success(
hass: HomeAssistantType, service_with_cookie: MagicMock
):
"""Test verification_code step success."""
flow = init_config_flow(hass)
await flow.async_step_user({CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD})
result = await flow.async_step_verification_code({CONF_VERIFICATION_CODE: 0})
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == USERNAME
assert result["data"][CONF_USERNAME] == USERNAME
assert result["data"][CONF_PASSWORD] == PASSWORD
assert result["data"][CONF_ACCOUNT_NAME] is None
assert result["data"][CONF_MAX_INTERVAL] == DEFAULT_MAX_INTERVAL
assert result["data"][CONF_GPS_ACCURACY_THRESHOLD] == DEFAULT_GPS_ACCURACY_THRESHOLD
async def test_validate_verification_code_failed(
hass: HomeAssistantType, service_validate_verification_code_failed: MagicMock
):
"""Test when we have errors during validate_verification_code."""
flow = init_config_flow(hass)
await flow.async_step_user({CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD})
result = await flow.async_step_verification_code({CONF_VERIFICATION_CODE: 0})
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == CONF_TRUSTED_DEVICE
assert result["errors"] == {"base": "validate_verification_code"}