Add Withings support (#25154)

* Rebasing with a clean branch.
Addressing PR feedback.
Cleaning up some static code checks.
Fixing bug with saving credentials.

* Removing unecessary change.

* Caching data manager.

* Removing configurable measures.

* Using import step in config flow.

* Updating config flows.

* Addressing PR feedback.

* Formatting source.

* Addressing PR feedback and rebasing.
This commit is contained in:
Robert Van Gorkom 2019-08-31 05:30:59 -07:00 committed by Martin Hjelmare
parent 944b544b2e
commit 614cf74225
19 changed files with 2566 additions and 0 deletions

View File

@ -301,6 +301,7 @@ homeassistant/components/weather/* @fabaff
homeassistant/components/weblink/* @home-assistant/core
homeassistant/components/websocket_api/* @home-assistant/core
homeassistant/components/wemo/* @sqldiablo
homeassistant/components/withings/* @vangorra
homeassistant/components/worldclock/* @fabaff
homeassistant/components/wwlln/* @bachya
homeassistant/components/xfinity/* @cisasteelersfan

View File

@ -0,0 +1,99 @@
"""
Support for the Withings API.
For more details about this platform, please refer to the documentation at
"""
import voluptuous as vol
from homeassistant.config_entries import ConfigEntry, SOURCE_IMPORT, SOURCE_USER
from homeassistant.helpers.typing import ConfigType, HomeAssistantType
from homeassistant.helpers import config_validation as cv
from . import config_flow, const
from .common import _LOGGER, get_data_manager, NotAuthenticatedError
DOMAIN = const.DOMAIN
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(const.CLIENT_ID): vol.All(cv.string, vol.Length(min=1)),
vol.Required(const.CLIENT_SECRET): vol.All(
cv.string, vol.Length(min=1)
),
vol.Optional(const.BASE_URL): cv.url,
vol.Required(const.PROFILES): vol.All(
cv.ensure_list,
vol.Unique(),
vol.Length(min=1),
[vol.All(cv.string, vol.Length(min=1))],
),
}
)
},
extra=vol.ALLOW_EXTRA,
)
async def async_setup(hass: HomeAssistantType, config: ConfigType):
"""Set up the Withings component."""
conf = config.get(DOMAIN)
if not conf:
return True
hass.data[DOMAIN] = {const.CONFIG: conf}
base_url = conf.get(const.BASE_URL, hass.config.api.base_url).rstrip("/")
hass.http.register_view(config_flow.WithingsAuthCallbackView)
config_flow.register_flow_implementation(
hass,
conf[const.CLIENT_ID],
conf[const.CLIENT_SECRET],
base_url,
conf[const.PROFILES],
)
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data={}
)
)
return True
async def async_setup_entry(hass: HomeAssistantType, entry: ConfigEntry):
"""Set up Withings from a config entry."""
data_manager = get_data_manager(hass, entry)
_LOGGER.debug("Confirming we're authenticated")
try:
await data_manager.check_authenticated()
except NotAuthenticatedError:
# Trigger new config flow.
hass.async_create_task(
hass.config_entries.flow.async_init(
const.DOMAIN,
context={"source": SOURCE_USER, const.PROFILE: data_manager.profile},
data={},
)
)
return False
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(entry, "sensor")
)
return True
async def async_unload_entry(hass: HomeAssistantType, entry: ConfigEntry):
"""Unload Withings config entry."""
await hass.async_create_task(
hass.config_entries.async_forward_entry_unload(entry, "sensor")
)
return True

View File

@ -0,0 +1,308 @@
"""Common code for Withings."""
import datetime
import logging
import re
import time
import nokia
from oauthlib.oauth2.rfc6749.errors import MissingTokenError
from requests_oauthlib import TokenUpdated
from homeassistant.config_entries import ConfigEntry
from homeassistant.exceptions import HomeAssistantError, PlatformNotReady
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.util import dt, slugify
from . import const
_LOGGER = logging.getLogger(const.LOG_NAMESPACE)
NOT_AUTHENTICATED_ERROR = re.compile(
".*(Error Code (100|101|102|200|401)|Missing access token parameter).*",
re.IGNORECASE,
)
class NotAuthenticatedError(HomeAssistantError):
"""Raise when not authenticated with the service."""
pass
class ServiceError(HomeAssistantError):
"""Raise when the service has an error."""
pass
class ThrottleData:
"""Throttle data."""
def __init__(self, interval: int, data):
"""Constructor."""
self._time = int(time.time())
self._interval = interval
self._data = data
@property
def time(self):
"""Get time created."""
return self._time
@property
def interval(self):
"""Get interval."""
return self._interval
@property
def data(self):
"""Get data."""
return self._data
def is_expired(self):
"""Is this data expired."""
return int(time.time()) - self.time > self.interval
class WithingsDataManager:
"""A class representing an Withings cloud service connection."""
service_available = None
def __init__(self, hass: HomeAssistantType, profile: str, api: nokia.NokiaApi):
"""Constructor."""
self._hass = hass
self._api = api
self._profile = profile
self._slug = slugify(profile)
self._measures = None
self._sleep = None
self._sleep_summary = None
self.sleep_summary_last_update_parameter = None
self.throttle_data = {}
@property
def profile(self) -> str:
"""Get the profile."""
return self._profile
@property
def slug(self) -> str:
"""Get the slugified profile the data is for."""
return self._slug
@property
def api(self):
"""Get the api object."""
return self._api
@property
def measures(self):
"""Get the current measures data."""
return self._measures
@property
def sleep(self):
"""Get the current sleep data."""
return self._sleep
@property
def sleep_summary(self):
"""Get the current sleep summary data."""
return self._sleep_summary
@staticmethod
def get_throttle_interval():
"""Get the throttle interval."""
return const.THROTTLE_INTERVAL
def get_throttle_data(self, domain: str) -> ThrottleData:
"""Get throttlel data."""
return self.throttle_data.get(domain)
def set_throttle_data(self, domain: str, throttle_data: ThrottleData):
"""Set throttle data."""
self.throttle_data[domain] = throttle_data
@staticmethod
def print_service_unavailable():
"""Print the service is unavailable (once) to the log."""
if WithingsDataManager.service_available is not False:
_LOGGER.error("Looks like the service is not available at the moment")
WithingsDataManager.service_available = False
return True
@staticmethod
def print_service_available():
"""Print the service is available (once) to to the log."""
if WithingsDataManager.service_available is not True:
_LOGGER.info("Looks like the service is available again")
WithingsDataManager.service_available = True
return True
async def call(self, function, is_first_call=True, throttle_domain=None):
"""Call an api method and handle the result."""
throttle_data = self.get_throttle_data(throttle_domain)
should_throttle = (
throttle_domain and throttle_data and not throttle_data.is_expired()
)
try:
if should_throttle:
_LOGGER.debug("Throttling call for domain: %s", throttle_domain)
result = throttle_data.data
else:
_LOGGER.debug("Running call.")
result = await self._hass.async_add_executor_job(function)
# Update throttle data.
self.set_throttle_data(
throttle_domain, ThrottleData(self.get_throttle_interval(), result)
)
WithingsDataManager.print_service_available()
return result
except TokenUpdated:
WithingsDataManager.print_service_available()
if not is_first_call:
raise ServiceError(
"Stuck in a token update loop. This should never happen"
)
_LOGGER.info("Token updated, re-running call.")
return await self.call(function, False, throttle_domain)
except MissingTokenError as ex:
raise NotAuthenticatedError(ex)
except Exception as ex: # pylint: disable=broad-except
# Service error, probably not authenticated.
if NOT_AUTHENTICATED_ERROR.match(str(ex)):
raise NotAuthenticatedError(ex)
# Probably a network error.
WithingsDataManager.print_service_unavailable()
raise PlatformNotReady(ex)
async def check_authenticated(self):
"""Check if the user is authenticated."""
def function():
return self._api.request("user", "getdevice", version="v2")
return await self.call(function)
async def update_measures(self):
"""Update the measures data."""
def function():
return self._api.get_measures()
self._measures = await self.call(function, throttle_domain="update_measures")
return self._measures
async def update_sleep(self):
"""Update the sleep data."""
end_date = int(time.time())
start_date = end_date - (6 * 60 * 60)
def function():
return self._api.get_sleep(startdate=start_date, enddate=end_date)
self._sleep = await self.call(function, throttle_domain="update_sleep")
return self._sleep
async def update_sleep_summary(self):
"""Update the sleep summary data."""
now = dt.utcnow()
yesterday = now - datetime.timedelta(days=1)
yesterday_noon = datetime.datetime(
yesterday.year,
yesterday.month,
yesterday.day,
12,
0,
0,
0,
datetime.timezone.utc,
)
_LOGGER.debug(
"Getting sleep summary data since: %s",
yesterday.strftime("%Y-%m-%d %H:%M:%S UTC"),
)
def function():
return self._api.get_sleep_summary(lastupdate=yesterday_noon.timestamp())
self._sleep_summary = await self.call(
function, throttle_domain="update_sleep_summary"
)
return self._sleep_summary
def create_withings_data_manager(
hass: HomeAssistantType, entry: ConfigEntry
) -> WithingsDataManager:
"""Set up the sensor config entry."""
entry_creds = entry.data.get(const.CREDENTIALS) or {}
profile = entry.data[const.PROFILE]
credentials = nokia.NokiaCredentials(
entry_creds.get("access_token"),
entry_creds.get("token_expiry"),
entry_creds.get("token_type"),
entry_creds.get("refresh_token"),
entry_creds.get("user_id"),
entry_creds.get("client_id"),
entry_creds.get("consumer_secret"),
)
def credentials_saver(credentials_param):
_LOGGER.debug("Saving updated credentials of type %s", type(credentials_param))
# Sanitizing the data as sometimes a NokiaCredentials object
# is passed through from the API.
cred_data = credentials_param
if not isinstance(credentials_param, dict):
cred_data = credentials_param.__dict__
entry.data[const.CREDENTIALS] = cred_data
hass.config_entries.async_update_entry(entry, data={**entry.data})
_LOGGER.debug("Creating nokia api instance")
api = nokia.NokiaApi(
credentials, refresh_cb=(lambda token: credentials_saver(api.credentials))
)
_LOGGER.debug("Creating withings data manager for profile: %s", profile)
return WithingsDataManager(hass, profile, api)
def get_data_manager(
hass: HomeAssistantType, entry: ConfigEntry
) -> WithingsDataManager:
"""Get a data manager for a config entry.
If the data manager doesn't exist yet, it will be
created and cached for later use.
"""
profile = entry.data.get(const.PROFILE)
if not hass.data.get(const.DOMAIN):
hass.data[const.DOMAIN] = {}
if not hass.data[const.DOMAIN].get(const.DATA_MANAGER):
hass.data[const.DOMAIN][const.DATA_MANAGER] = {}
if not hass.data[const.DOMAIN][const.DATA_MANAGER].get(profile):
hass.data[const.DOMAIN][const.DATA_MANAGER][
profile
] = create_withings_data_manager(hass, entry)
return hass.data[const.DOMAIN][const.DATA_MANAGER][profile]

View File

@ -0,0 +1,189 @@
"""Config flow for Withings."""
from collections import OrderedDict
import logging
from typing import Optional
import aiohttp
import nokia
import voluptuous as vol
from homeassistant import config_entries, data_entry_flow
from homeassistant.components.http import HomeAssistantView
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import callback
from . import const
DATA_FLOW_IMPL = "withings_flow_implementation"
_LOGGER = logging.getLogger(__name__)
@callback
def register_flow_implementation(hass, client_id, client_secret, base_url, profiles):
"""Register a flow implementation.
hass: Home assistant object.
client_id: Client id.
client_secret: Client secret.
base_url: Base url of home assistant instance.
profiles: The profiles to work with.
"""
if DATA_FLOW_IMPL not in hass.data:
hass.data[DATA_FLOW_IMPL] = OrderedDict()
hass.data[DATA_FLOW_IMPL] = {
const.CLIENT_ID: client_id,
const.CLIENT_SECRET: client_secret,
const.BASE_URL: base_url,
const.PROFILES: profiles,
}
@config_entries.HANDLERS.register(const.DOMAIN)
class WithingsFlowHandler(config_entries.ConfigFlow):
"""Handle a config flow."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_CLOUD_POLL
def __init__(self):
"""Initialize flow."""
self.flow_profile = None
self.data = None
def async_profile_config_entry(self, profile: str) -> Optional[ConfigEntry]:
"""Get a profile config entry."""
entries = self.hass.config_entries.async_entries(const.DOMAIN)
for entry in entries:
if entry.data.get(const.PROFILE) == profile:
return entry
return None
def get_auth_client(self, profile: str):
"""Get a new auth client."""
flow = self.hass.data[DATA_FLOW_IMPL]
client_id = flow[const.CLIENT_ID]
client_secret = flow[const.CLIENT_SECRET]
base_url = flow[const.BASE_URL].rstrip("/")
callback_uri = "{}/{}?flow_id={}&profile={}".format(
base_url.rstrip("/"),
const.AUTH_CALLBACK_PATH.lstrip("/"),
self.flow_id,
profile,
)
return nokia.NokiaAuth(
client_id,
client_secret,
callback_uri,
scope=",".join(["user.info", "user.metrics", "user.activity"]),
)
async def async_step_import(self, user_input=None):
"""Create user step."""
return await self.async_step_user(user_input)
async def async_step_user(self, user_input=None):
"""Create an entry for selecting a profile."""
flow = self.hass.data.get(DATA_FLOW_IMPL, {})
if user_input:
return await self.async_step_auth(user_input)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{vol.Required(const.PROFILE): vol.In(flow.get(const.PROFILES))}
),
)
async def async_step_auth(self, user_input=None):
"""Create an entry for auth."""
if user_input.get(const.CODE):
self.data = user_input
return self.async_external_step_done(next_step_id="finish")
profile = user_input.get(const.PROFILE)
auth_client = self.get_auth_client(profile)
url = auth_client.get_authorize_url()
return self.async_external_step(step_id="auth", url=url)
async def async_step_finish(self, user_input=None):
"""Received code for authentication."""
data = user_input or self.data or {}
_LOGGER.debug(
"Should close all flows below %s",
self.hass.config_entries.flow.async_progress(),
)
profile = data[const.PROFILE]
code = data[const.CODE]
return await self._async_create_session(profile, code)
async def _async_create_session(self, profile, code):
"""Create withings session and entries."""
auth_client = self.get_auth_client(profile)
_LOGGER.debug("Requesting credentials with code: %s.", code)
credentials = auth_client.get_credentials(code)
return self.async_create_entry(
title=profile,
data={const.PROFILE: profile, const.CREDENTIALS: credentials.__dict__},
)
class WithingsAuthCallbackView(HomeAssistantView):
"""Withings Authorization Callback View."""
requires_auth = False
url = const.AUTH_CALLBACK_PATH
name = const.AUTH_CALLBACK_NAME
def __init__(self):
"""Constructor."""
async def get(self, request):
"""Receive authorization code."""
hass = request.app["hass"]
code = request.query.get("code")
profile = request.query.get("profile")
flow_id = request.query.get("flow_id")
if not flow_id:
return aiohttp.web_response.Response(
status=400, text="'flow_id' argument not provided in url."
)
if not profile:
return aiohttp.web_response.Response(
status=400, text="'profile' argument not provided in url."
)
if not code:
return aiohttp.web_response.Response(
status=400, text="'code' argument not provided in url."
)
try:
await hass.config_entries.flow.async_configure(
flow_id, {const.PROFILE: profile, const.CODE: code}
)
return aiohttp.web_response.Response(
status=200,
headers={"content-type": "text/html"},
text="<script>window.close()</script>",
)
except data_entry_flow.UnknownFlow:
return aiohttp.web_response.Response(status=400, text="Unknown flow")

View File

@ -0,0 +1,103 @@
"""Constants used by the Withings component."""
import homeassistant.const as const
DATA_MANAGER = "data_manager"
BASE_URL = "base_url"
CLIENT_ID = "client_id"
CLIENT_SECRET = "client_secret"
CODE = "code"
CONFIG = "config"
CREDENTIALS = "credentials"
DOMAIN = "withings"
LOG_NAMESPACE = "homeassistant.components.withings"
MEASURES = "measures"
PROFILE = "profile"
PROFILES = "profiles"
AUTH_CALLBACK_PATH = "/api/withings/authorize"
AUTH_CALLBACK_NAME = "withings:authorize"
THROTTLE_INTERVAL = 60
STATE_UNKNOWN = const.STATE_UNKNOWN
STATE_AWAKE = "awake"
STATE_DEEP = "deep"
STATE_LIGHT = "light"
STATE_REM = "rem"
MEASURE_TYPE_BODY_TEMP = 71
MEASURE_TYPE_BONE_MASS = 88
MEASURE_TYPE_DIASTOLIC_BP = 9
MEASURE_TYPE_FAT_MASS = 8
MEASURE_TYPE_FAT_MASS_FREE = 5
MEASURE_TYPE_FAT_RATIO = 6
MEASURE_TYPE_HEART_PULSE = 11
MEASURE_TYPE_HEIGHT = 4
MEASURE_TYPE_HYDRATION = 77
MEASURE_TYPE_MUSCLE_MASS = 76
MEASURE_TYPE_PWV = 91
MEASURE_TYPE_SKIN_TEMP = 73
MEASURE_TYPE_SLEEP_DEEP_DURATION = "deepsleepduration"
MEASURE_TYPE_SLEEP_HEART_RATE_AVERAGE = "hr_average"
MEASURE_TYPE_SLEEP_HEART_RATE_MAX = "hr_max"
MEASURE_TYPE_SLEEP_HEART_RATE_MIN = "hr_min"
MEASURE_TYPE_SLEEP_LIGHT_DURATION = "lightsleepduration"
MEASURE_TYPE_SLEEP_REM_DURATION = "remsleepduration"
MEASURE_TYPE_SLEEP_RESPIRATORY_RATE_AVERAGE = "rr_average"
MEASURE_TYPE_SLEEP_RESPIRATORY_RATE_MAX = "rr_max"
MEASURE_TYPE_SLEEP_RESPIRATORY_RATE_MIN = "rr_min"
MEASURE_TYPE_SLEEP_STATE_AWAKE = 0
MEASURE_TYPE_SLEEP_STATE_DEEP = 2
MEASURE_TYPE_SLEEP_STATE_LIGHT = 1
MEASURE_TYPE_SLEEP_STATE_REM = 3
MEASURE_TYPE_SLEEP_TOSLEEP_DURATION = "durationtosleep"
MEASURE_TYPE_SLEEP_TOWAKEUP_DURATION = "durationtowakeup"
MEASURE_TYPE_SLEEP_WAKEUP_DURATION = "wakeupduration"
MEASURE_TYPE_SLEEP_WAKUP_COUNT = "wakeupcount"
MEASURE_TYPE_SPO2 = 54
MEASURE_TYPE_SYSTOLIC_BP = 10
MEASURE_TYPE_TEMP = 12
MEASURE_TYPE_WEIGHT = 1
MEAS_BODY_TEMP_C = "body_temperature_c"
MEAS_BONE_MASS_KG = "bone_mass_kg"
MEAS_DIASTOLIC_MMHG = "diastolic_blood_pressure_mmhg"
MEAS_FAT_FREE_MASS_KG = "fat_free_mass_kg"
MEAS_FAT_MASS_KG = "fat_mass_kg"
MEAS_FAT_RATIO_PCT = "fat_ratio_pct"
MEAS_HEART_PULSE_BPM = "heart_pulse_bpm"
MEAS_HEIGHT_M = "height_m"
MEAS_HYDRATION = "hydration"
MEAS_MUSCLE_MASS_KG = "muscle_mass_kg"
MEAS_PWV = "pulse_wave_velocity"
MEAS_SKIN_TEMP_C = "skin_temperature_c"
MEAS_SLEEP_DEEP_DURATION_SECONDS = "sleep_deep_duration_seconds"
MEAS_SLEEP_HEART_RATE_AVERAGE = "sleep_heart_rate_average_bpm"
MEAS_SLEEP_HEART_RATE_MAX = "sleep_heart_rate_max_bpm"
MEAS_SLEEP_HEART_RATE_MIN = "sleep_heart_rate_min_bpm"
MEAS_SLEEP_LIGHT_DURATION_SECONDS = "sleep_light_duration_seconds"
MEAS_SLEEP_REM_DURATION_SECONDS = "sleep_rem_duration_seconds"
MEAS_SLEEP_RESPIRATORY_RATE_AVERAGE = "sleep_respiratory_average_bpm"
MEAS_SLEEP_RESPIRATORY_RATE_MAX = "sleep_respiratory_max_bpm"
MEAS_SLEEP_RESPIRATORY_RATE_MIN = "sleep_respiratory_min_bpm"
MEAS_SLEEP_STATE = "sleep_state"
MEAS_SLEEP_TOSLEEP_DURATION_SECONDS = "sleep_tosleep_duration_seconds"
MEAS_SLEEP_TOWAKEUP_DURATION_SECONDS = "sleep_towakeup_duration_seconds"
MEAS_SLEEP_WAKEUP_COUNT = "sleep_wakeup_count"
MEAS_SLEEP_WAKEUP_DURATION_SECONDS = "sleep_wakeup_duration_seconds"
MEAS_SPO2_PCT = "spo2_pct"
MEAS_SYSTOLIC_MMGH = "systolic_blood_pressure_mmhg"
MEAS_TEMP_C = "temperature_c"
MEAS_WEIGHT_KG = "weight_kg"
UOM_BEATS_PER_MINUTE = "bpm"
UOM_BREATHS_PER_MINUTE = "br/m"
UOM_FREQUENCY = "times"
UOM_METERS_PER_SECOND = "m/s"
UOM_MMHG = "mmhg"
UOM_PERCENT = "%"
UOM_LENGTH_M = const.LENGTH_METERS
UOM_MASS_KG = const.MASS_KILOGRAMS
UOM_SECONDS = "seconds"
UOM_TEMP_C = const.TEMP_CELSIUS

View File

@ -0,0 +1,17 @@
{
"domain": "withings",
"name": "Withings",
"config_flow": true,
"documentation": "https://www.home-assistant.io/components/withings",
"requirements": [
"nokia==1.2.0"
],
"dependencies": [
"api",
"http",
"webhook"
],
"codeowners": [
"@vangorra"
]
}

View File

@ -0,0 +1,460 @@
"""Sensors flow for Withings."""
import typing as types
from homeassistant.config_entries import ConfigEntry
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.util import slugify
from . import const
from .common import _LOGGER, WithingsDataManager, get_data_manager
# There's only 3 calls (per profile) made to the withings api every 5
# minutes (see throttle values). This component wouldn't benefit
# much from parallel updates.
PARALLEL_UPDATES = 1
async def async_setup_entry(
hass: HomeAssistantType,
entry: ConfigEntry,
async_add_entities: types.Callable[[types.List[Entity], bool], None],
):
"""Set up the sensor config entry."""
data_manager = get_data_manager(hass, entry)
entities = create_sensor_entities(data_manager)
async_add_entities(entities, True)
def get_measures():
"""Get all the measures.
This function exists to be easily mockable so we can test
one measure at a time. This becomes necessary when integration
testing throttle functionality in the data manager.
"""
return list(WITHINGS_MEASUREMENTS_MAP)
def create_sensor_entities(data_manager: WithingsDataManager):
"""Create sensor entities."""
entities = []
measures = get_measures()
for attribute in WITHINGS_ATTRIBUTES:
if attribute.measurement not in measures:
_LOGGER.debug(
"Skipping measurement %s as it is not in the"
"list of measurements to use",
attribute.measurement,
)
continue
_LOGGER.debug(
"Creating entity for measurement: %s, measure_type: %s,"
"friendly_name: %s, unit_of_measurement: %s",
attribute.measurement,
attribute.measure_type,
attribute.friendly_name,
attribute.unit_of_measurement,
)
entity = WithingsHealthSensor(data_manager, attribute)
entities.append(entity)
return entities
class WithingsAttribute:
"""Base class for modeling withing data."""
def __init__(
self,
measurement: str,
measure_type,
friendly_name: str,
unit_of_measurement: str,
icon: str,
) -> None:
"""Constructor."""
self.measurement = measurement
self.measure_type = measure_type
self.friendly_name = friendly_name
self.unit_of_measurement = unit_of_measurement
self.icon = icon
class WithingsMeasureAttribute(WithingsAttribute):
"""Model measure attributes."""
class WithingsSleepStateAttribute(WithingsAttribute):
"""Model sleep data attributes."""
def __init__(
self, measurement: str, friendly_name: str, unit_of_measurement: str, icon: str
) -> None:
"""Constructor."""
super().__init__(measurement, None, friendly_name, unit_of_measurement, icon)
class WithingsSleepSummaryAttribute(WithingsAttribute):
"""Models sleep summary attributes."""
WITHINGS_ATTRIBUTES = [
WithingsMeasureAttribute(
const.MEAS_WEIGHT_KG,
const.MEASURE_TYPE_WEIGHT,
"Weight",
const.UOM_MASS_KG,
"mdi:weight-kilogram",
),
WithingsMeasureAttribute(
const.MEAS_FAT_MASS_KG,
const.MEASURE_TYPE_FAT_MASS,
"Fat Mass",
const.UOM_MASS_KG,
"mdi:weight-kilogram",
),
WithingsMeasureAttribute(
const.MEAS_FAT_FREE_MASS_KG,
const.MEASURE_TYPE_FAT_MASS_FREE,
"Fat Free Mass",
const.UOM_MASS_KG,
"mdi:weight-kilogram",
),
WithingsMeasureAttribute(
const.MEAS_MUSCLE_MASS_KG,
const.MEASURE_TYPE_MUSCLE_MASS,
"Muscle Mass",
const.UOM_MASS_KG,
"mdi:weight-kilogram",
),
WithingsMeasureAttribute(
const.MEAS_BONE_MASS_KG,
const.MEASURE_TYPE_BONE_MASS,
"Bone Mass",
const.UOM_MASS_KG,
"mdi:weight-kilogram",
),
WithingsMeasureAttribute(
const.MEAS_HEIGHT_M,
const.MEASURE_TYPE_HEIGHT,
"Height",
const.UOM_LENGTH_M,
"mdi:ruler",
),
WithingsMeasureAttribute(
const.MEAS_TEMP_C,
const.MEASURE_TYPE_TEMP,
"Temperature",
const.UOM_TEMP_C,
"mdi:thermometer",
),
WithingsMeasureAttribute(
const.MEAS_BODY_TEMP_C,
const.MEASURE_TYPE_BODY_TEMP,
"Body Temperature",
const.UOM_TEMP_C,
"mdi:thermometer",
),
WithingsMeasureAttribute(
const.MEAS_SKIN_TEMP_C,
const.MEASURE_TYPE_SKIN_TEMP,
"Skin Temperature",
const.UOM_TEMP_C,
"mdi:thermometer",
),
WithingsMeasureAttribute(
const.MEAS_FAT_RATIO_PCT,
const.MEASURE_TYPE_FAT_RATIO,
"Fat Ratio",
const.UOM_PERCENT,
None,
),
WithingsMeasureAttribute(
const.MEAS_DIASTOLIC_MMHG,
const.MEASURE_TYPE_DIASTOLIC_BP,
"Diastolic Blood Pressure",
const.UOM_MMHG,
None,
),
WithingsMeasureAttribute(
const.MEAS_SYSTOLIC_MMGH,
const.MEASURE_TYPE_SYSTOLIC_BP,
"Systolic Blood Pressure",
const.UOM_MMHG,
None,
),
WithingsMeasureAttribute(
const.MEAS_HEART_PULSE_BPM,
const.MEASURE_TYPE_HEART_PULSE,
"Heart Pulse",
const.UOM_BEATS_PER_MINUTE,
"mdi:heart-pulse",
),
WithingsMeasureAttribute(
const.MEAS_SPO2_PCT, const.MEASURE_TYPE_SPO2, "SP02", const.UOM_PERCENT, None
),
WithingsMeasureAttribute(
const.MEAS_HYDRATION, const.MEASURE_TYPE_HYDRATION, "Hydration", "", "mdi:water"
),
WithingsMeasureAttribute(
const.MEAS_PWV,
const.MEASURE_TYPE_PWV,
"Pulse Wave Velocity",
const.UOM_METERS_PER_SECOND,
None,
),
WithingsSleepStateAttribute(
const.MEAS_SLEEP_STATE, "Sleep state", None, "mdi:sleep"
),
WithingsSleepSummaryAttribute(
const.MEAS_SLEEP_WAKEUP_DURATION_SECONDS,
const.MEASURE_TYPE_SLEEP_WAKEUP_DURATION,
"Wakeup time",
const.UOM_SECONDS,
"mdi:sleep-off",
),
WithingsSleepSummaryAttribute(
const.MEAS_SLEEP_LIGHT_DURATION_SECONDS,
const.MEASURE_TYPE_SLEEP_LIGHT_DURATION,
"Light sleep",
const.UOM_SECONDS,
"mdi:sleep",
),
WithingsSleepSummaryAttribute(
const.MEAS_SLEEP_DEEP_DURATION_SECONDS,
const.MEASURE_TYPE_SLEEP_DEEP_DURATION,
"Deep sleep",
const.UOM_SECONDS,
"mdi:sleep",
),
WithingsSleepSummaryAttribute(
const.MEAS_SLEEP_REM_DURATION_SECONDS,
const.MEASURE_TYPE_SLEEP_REM_DURATION,
"REM sleep",
const.UOM_SECONDS,
"mdi:sleep",
),
WithingsSleepSummaryAttribute(
const.MEAS_SLEEP_WAKEUP_COUNT,
const.MEASURE_TYPE_SLEEP_WAKUP_COUNT,
"Wakeup count",
const.UOM_FREQUENCY,
"mdi:sleep-off",
),
WithingsSleepSummaryAttribute(
const.MEAS_SLEEP_TOSLEEP_DURATION_SECONDS,
const.MEASURE_TYPE_SLEEP_TOSLEEP_DURATION,
"Time to sleep",
const.UOM_SECONDS,
"mdi:sleep",
),
WithingsSleepSummaryAttribute(
const.MEAS_SLEEP_TOWAKEUP_DURATION_SECONDS,
const.MEASURE_TYPE_SLEEP_TOWAKEUP_DURATION,
"Time to wakeup",
const.UOM_SECONDS,
"mdi:sleep-off",
),
WithingsSleepSummaryAttribute(
const.MEAS_SLEEP_HEART_RATE_AVERAGE,
const.MEASURE_TYPE_SLEEP_HEART_RATE_AVERAGE,
"Average heart rate",
const.UOM_BEATS_PER_MINUTE,
"mdi:heart-pulse",
),
WithingsSleepSummaryAttribute(
const.MEAS_SLEEP_HEART_RATE_MIN,
const.MEASURE_TYPE_SLEEP_HEART_RATE_MIN,
"Minimum heart rate",
const.UOM_BEATS_PER_MINUTE,
"mdi:heart-pulse",
),
WithingsSleepSummaryAttribute(
const.MEAS_SLEEP_HEART_RATE_MAX,
const.MEASURE_TYPE_SLEEP_HEART_RATE_MAX,
"Maximum heart rate",
const.UOM_BEATS_PER_MINUTE,
"mdi:heart-pulse",
),
WithingsSleepSummaryAttribute(
const.MEAS_SLEEP_RESPIRATORY_RATE_AVERAGE,
const.MEASURE_TYPE_SLEEP_RESPIRATORY_RATE_AVERAGE,
"Average respiratory rate",
const.UOM_BREATHS_PER_MINUTE,
None,
),
WithingsSleepSummaryAttribute(
const.MEAS_SLEEP_RESPIRATORY_RATE_MIN,
const.MEASURE_TYPE_SLEEP_RESPIRATORY_RATE_MIN,
"Minimum respiratory rate",
const.UOM_BREATHS_PER_MINUTE,
None,
),
WithingsSleepSummaryAttribute(
const.MEAS_SLEEP_RESPIRATORY_RATE_MAX,
const.MEASURE_TYPE_SLEEP_RESPIRATORY_RATE_MAX,
"Maximum respiratory rate",
const.UOM_BREATHS_PER_MINUTE,
None,
),
]
WITHINGS_MEASUREMENTS_MAP = {attr.measurement: attr for attr in WITHINGS_ATTRIBUTES}
class WithingsHealthSensor(Entity):
"""Implementation of a Withings sensor."""
def __init__(
self, data_manager: WithingsDataManager, attribute: WithingsAttribute
) -> None:
"""Initialize the Withings sensor."""
self._data_manager = data_manager
self._attribute = attribute
self._state = None
self._slug = self._data_manager.slug
self._user_id = self._data_manager.api.get_credentials().user_id
@property
def name(self) -> str:
"""Return the name of the sensor."""
return "Withings {} {}".format(self._attribute.measurement, self._slug)
@property
def unique_id(self) -> str:
"""Return a unique, HASS-friendly identifier for this entity."""
return "withings_{}_{}_{}".format(
self._slug, self._user_id, slugify(self._attribute.measurement)
)
@property
def state(self):
"""Return the state of the sensor."""
return self._state
@property
def unit_of_measurement(self) -> str:
"""Return the unit of measurement of this entity, if any."""
return self._attribute.unit_of_measurement
@property
def icon(self) -> str:
"""Icon to use in the frontend, if any."""
return self._attribute.icon
@property
def device_state_attributes(self):
"""Get withings attributes."""
return self._attribute.__dict__
async def async_update(self) -> None:
"""Update the data."""
_LOGGER.debug(
"Async update slug: %s, measurement: %s, user_id: %s",
self._slug,
self._attribute.measurement,
self._user_id,
)
if isinstance(self._attribute, WithingsMeasureAttribute):
_LOGGER.debug("Updating measures state")
await self._data_manager.update_measures()
await self.async_update_measure(self._data_manager.measures)
elif isinstance(self._attribute, WithingsSleepStateAttribute):
_LOGGER.debug("Updating sleep state")
await self._data_manager.update_sleep()
await self.async_update_sleep_state(self._data_manager.sleep)
elif isinstance(self._attribute, WithingsSleepSummaryAttribute):
_LOGGER.debug("Updating sleep summary state")
await self._data_manager.update_sleep_summary()
await self.async_update_sleep_summary(self._data_manager.sleep_summary)
async def async_update_measure(self, data) -> None:
"""Update the measures data."""
if data is None:
_LOGGER.error("Provided data is None. Setting state to %s", None)
self._state = None
return
measure_type = self._attribute.measure_type
_LOGGER.debug(
"Finding the unambiguous measure group with measure_type: %s", measure_type
)
measure_groups = [
g
for g in data
if (not g.is_ambiguous() and g.get_measure(measure_type) is not None)
]
if not measure_groups:
_LOGGER.warning("No measure groups found, setting state to %s", None)
self._state = None
return
_LOGGER.debug(
"Sorting list of %s measure groups by date created (DESC)",
len(measure_groups),
)
measure_groups.sort(key=(lambda g: g.created), reverse=True)
self._state = round(measure_groups[0].get_measure(measure_type), 4)
async def async_update_sleep_state(self, data) -> None:
"""Update the sleep state data."""
if data is None:
_LOGGER.error("Provided data is None. Setting state to %s", None)
self._state = None
return
if not data.series:
_LOGGER.warning("No sleep data, setting state to %s", None)
self._state = None
return
series = sorted(data.series, key=lambda o: o.enddate, reverse=True)
serie = series[0]
if serie.state == const.MEASURE_TYPE_SLEEP_STATE_AWAKE:
self._state = const.STATE_AWAKE
elif serie.state == const.MEASURE_TYPE_SLEEP_STATE_LIGHT:
self._state = const.STATE_LIGHT
elif serie.state == const.MEASURE_TYPE_SLEEP_STATE_DEEP:
self._state = const.STATE_DEEP
elif serie.state == const.MEASURE_TYPE_SLEEP_STATE_REM:
self._state = const.STATE_REM
else:
self._state = None
async def async_update_sleep_summary(self, data) -> None:
"""Update the sleep summary data."""
if data is None:
_LOGGER.error("Provided data is None. Setting state to %s", None)
self._state = None
return
if not data.series:
_LOGGER.warning("Sleep data has no series, setting state to %s", None)
self._state = None
return
measurement = self._attribute.measurement
measure_type = self._attribute.measure_type
_LOGGER.debug("Determining total value for: %s", measurement)
total = 0
for serie in data.series:
if hasattr(serie, measure_type):
total += getattr(serie, measure_type)
self._state = round(total, 4)

View File

@ -0,0 +1,17 @@
{
"config": {
"title": "Withings",
"step": {
"user": {
"title": "User Profile.",
"description": "Select a user profile to which you want Home Assistant to map with a Withings profile. On the withings page, be sure to select the same user or data will not be labeled correctly.",
"data": {
"profile": "Profile"
}
}
},
"create_entry": {
"default": "Successfully authenticated with Withings for the selected profile."
}
}
}

View File

@ -62,6 +62,7 @@ FLOWS = [
"velbus",
"vesync",
"wemo",
"withings",
"wwlln",
"zha",
"zone",

View File

@ -843,6 +843,9 @@ niko-home-control==0.2.1
# homeassistant.components.nilu
niluclient==0.1.2
# homeassistant.components.withings
nokia==1.2.0
# homeassistant.components.nederlandse_spoorwegen
nsapi==2.7.4

View File

@ -222,6 +222,9 @@ minio==4.0.9
# homeassistant.components.ssdp
netdisco==2.6.0
# homeassistant.components.withings
nokia==1.2.0
# homeassistant.components.iqvia
# homeassistant.components.opencv
# homeassistant.components.tensorflow

View File

@ -102,6 +102,7 @@ TEST_REQUIREMENTS = (
"mficlient",
"minio",
"netdisco",
"nokia",
"numpy",
"oauth2client",
"paho-mqtt",

View File

@ -0,0 +1 @@
"""Tests for the withings component."""

View File

@ -0,0 +1,213 @@
"""Common data for for the withings component tests."""
import time
import nokia
import homeassistant.components.withings.const as const
def new_sleep_data(model, series):
"""Create simple dict to simulate api data."""
return {"series": series, "model": model}
def new_sleep_data_serie(startdate, enddate, state):
"""Create simple dict to simulate api data."""
return {"startdate": startdate, "enddate": enddate, "state": state}
def new_sleep_summary(timezone, model, startdate, enddate, date, modified, data):
"""Create simple dict to simulate api data."""
return {
"timezone": timezone,
"model": model,
"startdate": startdate,
"enddate": enddate,
"date": date,
"modified": modified,
"data": data,
}
def new_sleep_summary_detail(
wakeupduration,
lightsleepduration,
deepsleepduration,
remsleepduration,
wakeupcount,
durationtosleep,
durationtowakeup,
hr_average,
hr_min,
hr_max,
rr_average,
rr_min,
rr_max,
):
"""Create simple dict to simulate api data."""
return {
"wakeupduration": wakeupduration,
"lightsleepduration": lightsleepduration,
"deepsleepduration": deepsleepduration,
"remsleepduration": remsleepduration,
"wakeupcount": wakeupcount,
"durationtosleep": durationtosleep,
"durationtowakeup": durationtowakeup,
"hr_average": hr_average,
"hr_min": hr_min,
"hr_max": hr_max,
"rr_average": rr_average,
"rr_min": rr_min,
"rr_max": rr_max,
}
def new_measure_group(
grpid, attrib, date, created, category, deviceid, more, offset, measures
):
"""Create simple dict to simulate api data."""
return {
"grpid": grpid,
"attrib": attrib,
"date": date,
"created": created,
"category": category,
"deviceid": deviceid,
"measures": measures,
"more": more,
"offset": offset,
"comment": "blah", # deprecated
}
def new_measure(type_str, value, unit):
"""Create simple dict to simulate api data."""
return {
"value": value,
"type": type_str,
"unit": unit,
"algo": -1, # deprecated
"fm": -1, # deprecated
"fw": -1, # deprecated
}
def nokia_sleep_response(states):
"""Create a sleep response based on states."""
data = []
for state in states:
data.append(
new_sleep_data_serie(
"2019-02-01 0{}:00:00".format(str(len(data))),
"2019-02-01 0{}:00:00".format(str(len(data) + 1)),
state,
)
)
return nokia.NokiaSleep(new_sleep_data("aa", data))
NOKIA_MEASURES_RESPONSE = nokia.NokiaMeasures(
{
"updatetime": "",
"timezone": "",
"measuregrps": [
# Un-ambiguous groups.
new_measure_group(
1,
0,
time.time(),
time.time(),
1,
"DEV_ID",
False,
0,
[
new_measure(const.MEASURE_TYPE_WEIGHT, 70, 0),
new_measure(const.MEASURE_TYPE_FAT_MASS, 5, 0),
new_measure(const.MEASURE_TYPE_FAT_MASS_FREE, 60, 0),
new_measure(const.MEASURE_TYPE_MUSCLE_MASS, 50, 0),
new_measure(const.MEASURE_TYPE_BONE_MASS, 10, 0),
new_measure(const.MEASURE_TYPE_HEIGHT, 2, 0),
new_measure(const.MEASURE_TYPE_TEMP, 40, 0),
new_measure(const.MEASURE_TYPE_BODY_TEMP, 35, 0),
new_measure(const.MEASURE_TYPE_SKIN_TEMP, 20, 0),
new_measure(const.MEASURE_TYPE_FAT_RATIO, 70, -3),
new_measure(const.MEASURE_TYPE_DIASTOLIC_BP, 70, 0),
new_measure(const.MEASURE_TYPE_SYSTOLIC_BP, 100, 0),
new_measure(const.MEASURE_TYPE_HEART_PULSE, 60, 0),
new_measure(const.MEASURE_TYPE_SPO2, 95, -2),
new_measure(const.MEASURE_TYPE_HYDRATION, 95, -2),
new_measure(const.MEASURE_TYPE_PWV, 100, 0),
],
),
# Ambiguous groups (we ignore these)
new_measure_group(
1,
1,
time.time(),
time.time(),
1,
"DEV_ID",
False,
0,
[
new_measure(const.MEASURE_TYPE_WEIGHT, 71, 0),
new_measure(const.MEASURE_TYPE_FAT_MASS, 4, 0),
new_measure(const.MEASURE_TYPE_MUSCLE_MASS, 51, 0),
new_measure(const.MEASURE_TYPE_BONE_MASS, 11, 0),
new_measure(const.MEASURE_TYPE_HEIGHT, 201, 0),
new_measure(const.MEASURE_TYPE_TEMP, 41, 0),
new_measure(const.MEASURE_TYPE_BODY_TEMP, 34, 0),
new_measure(const.MEASURE_TYPE_SKIN_TEMP, 21, 0),
new_measure(const.MEASURE_TYPE_FAT_RATIO, 71, -3),
new_measure(const.MEASURE_TYPE_DIASTOLIC_BP, 71, 0),
new_measure(const.MEASURE_TYPE_SYSTOLIC_BP, 101, 0),
new_measure(const.MEASURE_TYPE_HEART_PULSE, 61, 0),
new_measure(const.MEASURE_TYPE_SPO2, 98, -2),
new_measure(const.MEASURE_TYPE_HYDRATION, 96, -2),
new_measure(const.MEASURE_TYPE_PWV, 102, 0),
],
),
],
}
)
NOKIA_SLEEP_RESPONSE = nokia_sleep_response(
[
const.MEASURE_TYPE_SLEEP_STATE_AWAKE,
const.MEASURE_TYPE_SLEEP_STATE_LIGHT,
const.MEASURE_TYPE_SLEEP_STATE_REM,
const.MEASURE_TYPE_SLEEP_STATE_DEEP,
]
)
NOKIA_SLEEP_SUMMARY_RESPONSE = nokia.NokiaSleepSummary(
{
"series": [
new_sleep_summary(
"UTC",
32,
"2019-02-01",
"2019-02-02",
"2019-02-02",
"12345",
new_sleep_summary_detail(
110, 210, 310, 410, 510, 610, 710, 810, 910, 1010, 1110, 1210, 1310
),
),
new_sleep_summary(
"UTC",
32,
"2019-02-01",
"2019-02-02",
"2019-02-02",
"12345",
new_sleep_summary_detail(
210, 310, 410, 510, 610, 710, 810, 910, 1010, 1110, 1210, 1310, 1410
),
),
]
}
)

View File

@ -0,0 +1,345 @@
"""Fixtures for withings tests."""
import time
from typing import Awaitable, Callable, List
import asynctest
import nokia
import pytest
import homeassistant.components.api as api
import homeassistant.components.http as http
import homeassistant.components.withings.const as const
from homeassistant.components.withings import CONFIG_SCHEMA, DOMAIN
from homeassistant.config import async_process_ha_core_config
from homeassistant.const import CONF_UNIT_SYSTEM, CONF_UNIT_SYSTEM_METRIC
from homeassistant.setup import async_setup_component
from .common import (
NOKIA_MEASURES_RESPONSE,
NOKIA_SLEEP_RESPONSE,
NOKIA_SLEEP_SUMMARY_RESPONSE,
)
class WithingsFactoryConfig:
"""Configuration for withings test fixture."""
PROFILE_1 = "Person 1"
PROFILE_2 = "Person 2"
def __init__(
self,
api_config: dict = None,
http_config: dict = None,
measures: List[str] = None,
unit_system: str = None,
throttle_interval: int = const.THROTTLE_INTERVAL,
nokia_request_response="DATA",
nokia_measures_response: nokia.NokiaMeasures = NOKIA_MEASURES_RESPONSE,
nokia_sleep_response: nokia.NokiaSleep = NOKIA_SLEEP_RESPONSE,
nokia_sleep_summary_response: nokia.NokiaSleepSummary = NOKIA_SLEEP_SUMMARY_RESPONSE,
) -> None:
"""Constructor."""
self._throttle_interval = throttle_interval
self._nokia_request_response = nokia_request_response
self._nokia_measures_response = nokia_measures_response
self._nokia_sleep_response = nokia_sleep_response
self._nokia_sleep_summary_response = nokia_sleep_summary_response
self._withings_config = {
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "my_client_secret",
const.PROFILES: [
WithingsFactoryConfig.PROFILE_1,
WithingsFactoryConfig.PROFILE_2,
],
}
self._api_config = api_config or {"base_url": "http://localhost/"}
self._http_config = http_config or {}
self._measures = measures
assert self._withings_config, "withings_config must be set."
assert isinstance(
self._withings_config, dict
), "withings_config must be a dict."
assert isinstance(self._api_config, dict), "api_config must be a dict."
assert isinstance(self._http_config, dict), "http_config must be a dict."
self._hass_config = {
"homeassistant": {CONF_UNIT_SYSTEM: unit_system or CONF_UNIT_SYSTEM_METRIC},
api.DOMAIN: self._api_config,
http.DOMAIN: self._http_config,
DOMAIN: self._withings_config,
}
@property
def withings_config(self):
"""Get withings component config."""
return self._withings_config
@property
def api_config(self):
"""Get api component config."""
return self._api_config
@property
def http_config(self):
"""Get http component config."""
return self._http_config
@property
def measures(self):
"""Get the measures."""
return self._measures
@property
def hass_config(self):
"""Home assistant config."""
return self._hass_config
@property
def throttle_interval(self):
"""Throttle interval."""
return self._throttle_interval
@property
def nokia_request_response(self):
"""Request response."""
return self._nokia_request_response
@property
def nokia_measures_response(self) -> nokia.NokiaMeasures:
"""Measures response."""
return self._nokia_measures_response
@property
def nokia_sleep_response(self) -> nokia.NokiaSleep:
"""Sleep response."""
return self._nokia_sleep_response
@property
def nokia_sleep_summary_response(self) -> nokia.NokiaSleepSummary:
"""Sleep summary response."""
return self._nokia_sleep_summary_response
class WithingsFactoryData:
"""Data about the configured withing test component."""
def __init__(
self,
hass,
flow_id,
nokia_auth_get_credentials_mock,
nokia_api_request_mock,
nokia_api_get_measures_mock,
nokia_api_get_sleep_mock,
nokia_api_get_sleep_summary_mock,
data_manager_get_throttle_interval_mock,
):
"""Constructor."""
self._hass = hass
self._flow_id = flow_id
self._nokia_auth_get_credentials_mock = nokia_auth_get_credentials_mock
self._nokia_api_request_mock = nokia_api_request_mock
self._nokia_api_get_measures_mock = nokia_api_get_measures_mock
self._nokia_api_get_sleep_mock = nokia_api_get_sleep_mock
self._nokia_api_get_sleep_summary_mock = nokia_api_get_sleep_summary_mock
self._data_manager_get_throttle_interval_mock = (
data_manager_get_throttle_interval_mock
)
@property
def hass(self):
"""Get hass instance."""
return self._hass
@property
def flow_id(self):
"""Get flow id."""
return self._flow_id
@property
def nokia_auth_get_credentials_mock(self):
"""Get auth credentials mock."""
return self._nokia_auth_get_credentials_mock
@property
def nokia_api_request_mock(self):
"""Get request mock."""
return self._nokia_api_request_mock
@property
def nokia_api_get_measures_mock(self):
"""Get measures mock."""
return self._nokia_api_get_measures_mock
@property
def nokia_api_get_sleep_mock(self):
"""Get sleep mock."""
return self._nokia_api_get_sleep_mock
@property
def nokia_api_get_sleep_summary_mock(self):
"""Get sleep summary mock."""
return self._nokia_api_get_sleep_summary_mock
@property
def data_manager_get_throttle_interval_mock(self):
"""Get throttle mock."""
return self._data_manager_get_throttle_interval_mock
async def configure_user(self):
"""Present a form with user profiles."""
step = await self.hass.config_entries.flow.async_configure(self.flow_id, None)
assert step["step_id"] == "user"
async def configure_profile(self, profile: str):
"""Select the user profile. Present a form with authorization link."""
print("CONFIG_PROFILE:", profile)
step = await self.hass.config_entries.flow.async_configure(
self.flow_id, {const.PROFILE: profile}
)
assert step["step_id"] == "auth"
async def configure_code(self, profile: str, code: str):
"""Handle authorization code. Create config entries."""
step = await self.hass.config_entries.flow.async_configure(
self.flow_id, {const.PROFILE: profile, const.CODE: code}
)
assert step["type"] == "external_done"
await self.hass.async_block_till_done()
step = await self.hass.config_entries.flow.async_configure(
self.flow_id, {const.PROFILE: profile, const.CODE: code}
)
assert step["type"] == "create_entry"
await self.hass.async_block_till_done()
async def configure_all(self, profile: str, code: str):
"""Configure all flow steps."""
await self.configure_user()
await self.configure_profile(profile)
await self.configure_code(profile, code)
WithingsFactory = Callable[[WithingsFactoryConfig], Awaitable[WithingsFactoryData]]
@pytest.fixture(name="withings_factory")
def withings_factory_fixture(request, hass) -> WithingsFactory:
"""Home assistant platform fixture."""
patches = []
async def factory(config: WithingsFactoryConfig) -> WithingsFactoryData:
CONFIG_SCHEMA(config.hass_config.get(DOMAIN))
await async_process_ha_core_config(
hass, config.hass_config.get("homeassistant")
)
assert await async_setup_component(hass, http.DOMAIN, config.hass_config)
assert await async_setup_component(hass, api.DOMAIN, config.hass_config)
nokia_auth_get_credentials_patch = asynctest.patch(
"nokia.NokiaAuth.get_credentials",
return_value=nokia.NokiaCredentials(
access_token="my_access_token",
token_expiry=time.time() + 600,
token_type="my_token_type",
refresh_token="my_refresh_token",
user_id="my_user_id",
client_id=config.withings_config.get(const.CLIENT_ID),
consumer_secret=config.withings_config.get(const.CLIENT_SECRET),
),
)
nokia_auth_get_credentials_mock = nokia_auth_get_credentials_patch.start()
nokia_api_request_patch = asynctest.patch(
"nokia.NokiaApi.request", return_value=config.nokia_request_response
)
nokia_api_request_mock = nokia_api_request_patch.start()
nokia_api_get_measures_patch = asynctest.patch(
"nokia.NokiaApi.get_measures", return_value=config.nokia_measures_response
)
nokia_api_get_measures_mock = nokia_api_get_measures_patch.start()
nokia_api_get_sleep_patch = asynctest.patch(
"nokia.NokiaApi.get_sleep", return_value=config.nokia_sleep_response
)
nokia_api_get_sleep_mock = nokia_api_get_sleep_patch.start()
nokia_api_get_sleep_summary_patch = asynctest.patch(
"nokia.NokiaApi.get_sleep_summary",
return_value=config.nokia_sleep_summary_response,
)
nokia_api_get_sleep_summary_mock = nokia_api_get_sleep_summary_patch.start()
data_manager_get_throttle_interval_patch = asynctest.patch(
"homeassistant.components.withings.common.WithingsDataManager"
".get_throttle_interval",
return_value=config.throttle_interval,
)
data_manager_get_throttle_interval_mock = (
data_manager_get_throttle_interval_patch.start()
)
get_measures_patch = asynctest.patch(
"homeassistant.components.withings.sensor.get_measures",
return_value=config.measures,
)
get_measures_patch.start()
patches.extend(
[
nokia_auth_get_credentials_patch,
nokia_api_request_patch,
nokia_api_get_measures_patch,
nokia_api_get_sleep_patch,
nokia_api_get_sleep_summary_patch,
data_manager_get_throttle_interval_patch,
get_measures_patch,
]
)
# Collect the flow id.
tasks = []
orig_async_create_task = hass.async_create_task
def create_task(*args):
task = orig_async_create_task(*args)
tasks.append(task)
return task
async_create_task_patch = asynctest.patch.object(
hass, "async_create_task", side_effect=create_task
)
with async_create_task_patch:
assert await async_setup_component(hass, DOMAIN, config.hass_config)
await hass.async_block_till_done()
flow_id = tasks[2].result()["flow_id"]
return WithingsFactoryData(
hass,
flow_id,
nokia_auth_get_credentials_mock,
nokia_api_request_mock,
nokia_api_get_measures_mock,
nokia_api_get_sleep_mock,
nokia_api_get_sleep_summary_mock,
data_manager_get_throttle_interval_mock,
)
def cleanup():
for patch in patches:
patch.stop()
request.addfinalizer(cleanup)
return factory

View File

@ -0,0 +1,130 @@
"""Tests for the Withings component."""
from asynctest import MagicMock
import nokia
from oauthlib.oauth2.rfc6749.errors import MissingTokenError
import pytest
from requests_oauthlib import TokenUpdated
from homeassistant.components.withings.common import (
NotAuthenticatedError,
ServiceError,
WithingsDataManager,
)
from homeassistant.exceptions import PlatformNotReady
@pytest.fixture(name="nokia_api")
def nokia_api_fixture():
"""Provide nokia api."""
nokia_api = nokia.NokiaApi.__new__(nokia.NokiaApi)
nokia_api.get_measures = MagicMock()
nokia_api.get_sleep = MagicMock()
return nokia_api
@pytest.fixture(name="data_manager")
def data_manager_fixture(hass, nokia_api: nokia.NokiaApi):
"""Provide data manager."""
return WithingsDataManager(hass, "My Profile", nokia_api)
def test_print_service():
"""Test method."""
# Go from None to True
WithingsDataManager.service_available = None
assert WithingsDataManager.print_service_available()
assert WithingsDataManager.service_available is True
assert not WithingsDataManager.print_service_available()
assert not WithingsDataManager.print_service_available()
# Go from True to False
assert WithingsDataManager.print_service_unavailable()
assert WithingsDataManager.service_available is False
assert not WithingsDataManager.print_service_unavailable()
assert not WithingsDataManager.print_service_unavailable()
# Go from False to True
assert WithingsDataManager.print_service_available()
assert WithingsDataManager.service_available is True
assert not WithingsDataManager.print_service_available()
assert not WithingsDataManager.print_service_available()
# Go from Non to False
WithingsDataManager.service_available = None
assert WithingsDataManager.print_service_unavailable()
assert WithingsDataManager.service_available is False
assert not WithingsDataManager.print_service_unavailable()
assert not WithingsDataManager.print_service_unavailable()
async def test_data_manager_call(data_manager):
"""Test method."""
# Token refreshed.
def hello_func():
return "HELLO2"
function = MagicMock(side_effect=[TokenUpdated("my_token"), hello_func()])
result = await data_manager.call(function)
assert result == "HELLO2"
assert function.call_count == 2
# Too many token refreshes.
function = MagicMock(
side_effect=[TokenUpdated("my_token"), TokenUpdated("my_token")]
)
try:
result = await data_manager.call(function)
assert False, "This should not have ran."
except ServiceError:
assert True
assert function.call_count == 2
# Not authenticated 1.
test_function = MagicMock(side_effect=MissingTokenError("Error Code 401"))
try:
result = await data_manager.call(test_function)
assert False, "An exception should have been thrown."
except NotAuthenticatedError:
assert True
# Not authenticated 2.
test_function = MagicMock(side_effect=Exception("Error Code 401"))
try:
result = await data_manager.call(test_function)
assert False, "An exception should have been thrown."
except NotAuthenticatedError:
assert True
# Service error.
test_function = MagicMock(side_effect=PlatformNotReady())
try:
result = await data_manager.call(test_function)
assert False, "An exception should have been thrown."
except PlatformNotReady:
assert True
async def test_data_manager_call_throttle_enabled(data_manager):
"""Test method."""
hello_func = MagicMock(return_value="HELLO2")
result = await data_manager.call(hello_func, throttle_domain="test")
assert result == "HELLO2"
result = await data_manager.call(hello_func, throttle_domain="test")
assert result == "HELLO2"
assert hello_func.call_count == 1
async def test_data_manager_call_throttle_disabled(data_manager):
"""Test method."""
hello_func = MagicMock(return_value="HELLO2")
result = await data_manager.call(hello_func)
assert result == "HELLO2"
result = await data_manager.call(hello_func)
assert result == "HELLO2"
assert hello_func.call_count == 2

View File

@ -0,0 +1,175 @@
"""Tests for the Withings config flow."""
from aiohttp.web_request import BaseRequest
from asynctest import CoroutineMock, MagicMock
import pytest
from homeassistant import setup, data_entry_flow
import homeassistant.components.api as api
import homeassistant.components.http as http
from homeassistant.components.withings import const
from homeassistant.components.withings.config_flow import (
register_flow_implementation,
WithingsFlowHandler,
WithingsAuthCallbackView,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.helpers.typing import HomeAssistantType
@pytest.fixture(name="flow_handler")
def flow_handler_fixture(hass: HomeAssistantType):
"""Provide flow handler."""
flow_handler = WithingsFlowHandler()
flow_handler.hass = hass
return flow_handler
@pytest.fixture(name="setup_hass")
async def setup_hass_fixture(hass: HomeAssistantType):
"""Provide hass instance."""
config = {
http.DOMAIN: {},
api.DOMAIN: {"base_url": "http://localhost/"},
const.DOMAIN: {
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "my_secret",
const.PROFILES: ["Person 1", "Person 2"],
},
}
hass.data = {}
await setup.async_setup_component(hass, "http", config)
await setup.async_setup_component(hass, "api", config)
return hass
def test_flow_handler_init(flow_handler: WithingsFlowHandler):
"""Test the init of the flow handler."""
assert not flow_handler.flow_profile
def test_flow_handler_async_profile_config_entry(
hass: HomeAssistantType, flow_handler: WithingsFlowHandler
):
"""Test profile config entry."""
config_entries = [
ConfigEntry(
version=1,
domain=const.DOMAIN,
title="AAA",
data={},
source="source",
connection_class="connection_class",
system_options={},
),
ConfigEntry(
version=1,
domain=const.DOMAIN,
title="Person 1",
data={const.PROFILE: "Person 1"},
source="source",
connection_class="connection_class",
system_options={},
),
ConfigEntry(
version=1,
domain=const.DOMAIN,
title="BBB",
data={},
source="source",
connection_class="connection_class",
system_options={},
),
]
hass.config_entries.async_entries = MagicMock(return_value=config_entries)
config_entry = flow_handler.async_profile_config_entry
assert not config_entry("GGGG")
hass.config_entries.async_entries.assert_called_with(const.DOMAIN)
assert not config_entry("CCC")
hass.config_entries.async_entries.assert_called_with(const.DOMAIN)
assert config_entry("Person 1") == config_entries[1]
hass.config_entries.async_entries.assert_called_with(const.DOMAIN)
def test_flow_handler_get_auth_client(
hass: HomeAssistantType, flow_handler: WithingsFlowHandler
):
"""Test creation of an auth client."""
register_flow_implementation(
hass, "my_client_id", "my_client_secret", "http://localhost/", ["Person 1"]
)
client = flow_handler.get_auth_client("Person 1")
assert client.client_id == "my_client_id"
assert client.consumer_secret == "my_client_secret"
assert client.callback_uri.startswith(
"http://localhost/api/withings/authorize?flow_id="
)
assert client.callback_uri.endswith("&profile=Person 1")
assert client.scope == "user.info,user.metrics,user.activity"
async def test_auth_callback_view_get(hass: HomeAssistantType):
"""Test get api path."""
view = WithingsAuthCallbackView()
hass.config_entries.flow.async_configure = CoroutineMock(return_value="AAAA")
request = MagicMock(spec=BaseRequest)
request.app = {"hass": hass}
# No args
request.query = {}
response = await view.get(request)
assert response.status == 400
hass.config_entries.flow.async_configure.assert_not_called()
hass.config_entries.flow.async_configure.reset_mock()
# Checking flow_id
request.query = {"flow_id": "my_flow_id"}
response = await view.get(request)
assert response.status == 400
hass.config_entries.flow.async_configure.assert_not_called()
hass.config_entries.flow.async_configure.reset_mock()
# Checking flow_id and profile
request.query = {"flow_id": "my_flow_id", "profile": "my_profile"}
response = await view.get(request)
assert response.status == 400
hass.config_entries.flow.async_configure.assert_not_called()
hass.config_entries.flow.async_configure.reset_mock()
# Checking flow_id, profile, code
request.query = {
"flow_id": "my_flow_id",
"profile": "my_profile",
"code": "my_code",
}
response = await view.get(request)
assert response.status == 200
hass.config_entries.flow.async_configure.assert_called_with(
"my_flow_id", {const.PROFILE: "my_profile", const.CODE: "my_code"}
)
hass.config_entries.flow.async_configure.reset_mock()
# Exception thrown
hass.config_entries.flow.async_configure = CoroutineMock(
side_effect=data_entry_flow.UnknownFlow()
)
request.query = {
"flow_id": "my_flow_id",
"profile": "my_profile",
"code": "my_code",
}
response = await view.get(request)
assert response.status == 400
hass.config_entries.flow.async_configure.assert_called_with(
"my_flow_id", {const.PROFILE: "my_profile", const.CODE: "my_code"}
)
hass.config_entries.flow.async_configure.reset_mock()

View File

@ -0,0 +1,196 @@
"""Tests for the Withings component."""
from asynctest import MagicMock
import voluptuous as vol
import homeassistant.components.api as api
import homeassistant.components.http as http
from homeassistant.components.withings import async_setup, const, CONFIG_SCHEMA
from .conftest import WithingsFactory, WithingsFactoryConfig
BASE_HASS_CONFIG = {
http.DOMAIN: {},
api.DOMAIN: {"base_url": "http://localhost/"},
const.DOMAIN: None,
}
def config_schema_validate(withings_config):
"""Assert a schema config succeeds."""
hass_config = BASE_HASS_CONFIG.copy()
hass_config[const.DOMAIN] = withings_config
return CONFIG_SCHEMA(hass_config)
def config_schema_assert_fail(withings_config):
"""Assert a schema config will fail."""
try:
config_schema_validate(withings_config)
assert False, "This line should not have run."
except vol.error.MultipleInvalid:
assert True
def test_config_schema_basic_config():
"""Test schema."""
config_schema_validate(
{
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "my_client_secret",
const.PROFILES: ["Person 1", "Person 2"],
}
)
def test_config_schema_client_id():
"""Test schema."""
config_schema_assert_fail(
{
const.CLIENT_SECRET: "my_client_secret",
const.PROFILES: ["Person 1", "Person 2"],
}
)
config_schema_assert_fail(
{
const.CLIENT_SECRET: "my_client_secret",
const.CLIENT_ID: "",
const.PROFILES: ["Person 1"],
}
)
config_schema_validate(
{
const.CLIENT_SECRET: "my_client_secret",
const.CLIENT_ID: "my_client_id",
const.PROFILES: ["Person 1"],
}
)
def test_config_schema_client_secret():
"""Test schema."""
config_schema_assert_fail(
{const.CLIENT_ID: "my_client_id", const.PROFILES: ["Person 1"]}
)
config_schema_assert_fail(
{
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "",
const.PROFILES: ["Person 1"],
}
)
config_schema_validate(
{
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "my_client_secret",
const.PROFILES: ["Person 1"],
}
)
def test_config_schema_profiles():
"""Test schema."""
config_schema_assert_fail(
{const.CLIENT_ID: "my_client_id", const.CLIENT_SECRET: "my_client_secret"}
)
config_schema_assert_fail(
{
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "my_client_secret",
const.PROFILES: "",
}
)
config_schema_assert_fail(
{
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "my_client_secret",
const.PROFILES: [],
}
)
config_schema_assert_fail(
{
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "my_client_secret",
const.PROFILES: ["Person 1", "Person 1"],
}
)
config_schema_validate(
{
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "my_client_secret",
const.PROFILES: ["Person 1"],
}
)
config_schema_validate(
{
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "my_client_secret",
const.PROFILES: ["Person 1", "Person 2"],
}
)
def test_config_schema_base_url():
"""Test schema."""
config_schema_validate(
{
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "my_client_secret",
const.PROFILES: ["Person 1"],
}
)
config_schema_assert_fail(
{
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "my_client_secret",
const.BASE_URL: 123,
const.PROFILES: ["Person 1"],
}
)
config_schema_assert_fail(
{
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "my_client_secret",
const.BASE_URL: "",
const.PROFILES: ["Person 1"],
}
)
config_schema_assert_fail(
{
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "my_client_secret",
const.BASE_URL: "blah blah",
const.PROFILES: ["Person 1"],
}
)
config_schema_validate(
{
const.CLIENT_ID: "my_client_id",
const.CLIENT_SECRET: "my_client_secret",
const.BASE_URL: "https://www.blah.blah.blah/blah/blah",
const.PROFILES: ["Person 1"],
}
)
async def test_async_setup_no_config(hass):
"""Test method."""
hass.async_create_task = MagicMock()
await async_setup(hass, {})
hass.async_create_task.assert_not_called()
async def test_async_setup_teardown(withings_factory: WithingsFactory, hass):
"""Test method."""
data = await withings_factory(WithingsFactoryConfig(measures=[const.MEAS_TEMP_C]))
profile = WithingsFactoryConfig.PROFILE_1
await data.configure_all(profile, "authorization_code")
entries = hass.config_entries.async_entries(const.DOMAIN)
assert entries
for entry in entries:
await hass.config_entries.async_unload(entry.entry_id)

View File

@ -0,0 +1,304 @@
"""Tests for the Withings component."""
from unittest.mock import MagicMock, patch
import asynctest
from nokia import NokiaApi, NokiaMeasures, NokiaSleep, NokiaSleepSummary
import pytest
from homeassistant.components.withings import DOMAIN
from homeassistant.components.withings.common import NotAuthenticatedError
import homeassistant.components.withings.const as const
from homeassistant.components.withings.sensor import async_setup_entry
from homeassistant.config_entries import ConfigEntry, SOURCE_USER
from homeassistant.const import STATE_UNKNOWN
from homeassistant.helpers.entity_component import async_update_entity
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.util import slugify
from .common import nokia_sleep_response
from .conftest import WithingsFactory, WithingsFactoryConfig
def get_entity_id(measure, profile):
"""Get an entity id for a measure and profile."""
return "sensor.{}_{}_{}".format(DOMAIN, measure, slugify(profile))
def assert_state_equals(hass: HomeAssistantType, profile: str, measure: str, expected):
"""Assert the state of a withings sensor."""
entity_id = get_entity_id(measure, profile)
state_obj = hass.states.get(entity_id)
assert state_obj, "Expected entity {} to exist but it did not".format(entity_id)
assert state_obj.state == str(
expected
), "Expected {} but was {} for measure {}".format(
expected, state_obj.state, measure
)
async def test_health_sensor_properties(withings_factory: WithingsFactory):
"""Test method."""
data = await withings_factory(WithingsFactoryConfig(measures=[const.MEAS_HEIGHT_M]))
await data.configure_all(WithingsFactoryConfig.PROFILE_1, "authorization_code")
state = data.hass.states.get("sensor.withings_height_m_person_1")
state_dict = state.as_dict()
assert state_dict.get("state") == "2"
assert state_dict.get("attributes") == {
"measurement": "height_m",
"measure_type": 4,
"friendly_name": "Withings height_m person_1",
"unit_of_measurement": "m",
"icon": "mdi:ruler",
}
SENSOR_TEST_DATA = [
(const.MEAS_WEIGHT_KG, 70),
(const.MEAS_FAT_MASS_KG, 5),
(const.MEAS_FAT_FREE_MASS_KG, 60),
(const.MEAS_MUSCLE_MASS_KG, 50),
(const.MEAS_BONE_MASS_KG, 10),
(const.MEAS_HEIGHT_M, 2),
(const.MEAS_FAT_RATIO_PCT, 0.07),
(const.MEAS_DIASTOLIC_MMHG, 70),
(const.MEAS_SYSTOLIC_MMGH, 100),
(const.MEAS_HEART_PULSE_BPM, 60),
(const.MEAS_SPO2_PCT, 0.95),
(const.MEAS_HYDRATION, 0.95),
(const.MEAS_PWV, 100),
(const.MEAS_SLEEP_WAKEUP_DURATION_SECONDS, 320),
(const.MEAS_SLEEP_LIGHT_DURATION_SECONDS, 520),
(const.MEAS_SLEEP_DEEP_DURATION_SECONDS, 720),
(const.MEAS_SLEEP_REM_DURATION_SECONDS, 920),
(const.MEAS_SLEEP_WAKEUP_COUNT, 1120),
(const.MEAS_SLEEP_TOSLEEP_DURATION_SECONDS, 1320),
(const.MEAS_SLEEP_TOWAKEUP_DURATION_SECONDS, 1520),
(const.MEAS_SLEEP_HEART_RATE_AVERAGE, 1720),
(const.MEAS_SLEEP_HEART_RATE_MIN, 1920),
(const.MEAS_SLEEP_HEART_RATE_MAX, 2120),
(const.MEAS_SLEEP_RESPIRATORY_RATE_AVERAGE, 2320),
(const.MEAS_SLEEP_RESPIRATORY_RATE_MIN, 2520),
(const.MEAS_SLEEP_RESPIRATORY_RATE_MAX, 2720),
]
@pytest.mark.parametrize("measure,expected", SENSOR_TEST_DATA)
async def test_health_sensor_throttled(
withings_factory: WithingsFactory, measure, expected
):
"""Test method."""
data = await withings_factory(WithingsFactoryConfig(measures=measure))
profile = WithingsFactoryConfig.PROFILE_1
await data.configure_all(profile, "authorization_code")
# Checking initial data.
assert_state_equals(data.hass, profile, measure, expected)
# Encountering a throttled data.
await async_update_entity(data.hass, get_entity_id(measure, profile))
assert_state_equals(data.hass, profile, measure, expected)
NONE_SENSOR_TEST_DATA = [
(const.MEAS_WEIGHT_KG, STATE_UNKNOWN),
(const.MEAS_SLEEP_STATE, STATE_UNKNOWN),
(const.MEAS_SLEEP_RESPIRATORY_RATE_MAX, STATE_UNKNOWN),
]
@pytest.mark.parametrize("measure,expected", NONE_SENSOR_TEST_DATA)
async def test_health_sensor_state_none(
withings_factory: WithingsFactory, measure, expected
):
"""Test method."""
data = await withings_factory(
WithingsFactoryConfig(
measures=measure,
nokia_measures_response=None,
nokia_sleep_response=None,
nokia_sleep_summary_response=None,
)
)
profile = WithingsFactoryConfig.PROFILE_1
await data.configure_all(profile, "authorization_code")
# Checking initial data.
assert_state_equals(data.hass, profile, measure, expected)
# Encountering a throttled data.
await async_update_entity(data.hass, get_entity_id(measure, profile))
assert_state_equals(data.hass, profile, measure, expected)
EMPTY_SENSOR_TEST_DATA = [
(const.MEAS_WEIGHT_KG, STATE_UNKNOWN),
(const.MEAS_SLEEP_STATE, STATE_UNKNOWN),
(const.MEAS_SLEEP_RESPIRATORY_RATE_MAX, STATE_UNKNOWN),
]
@pytest.mark.parametrize("measure,expected", EMPTY_SENSOR_TEST_DATA)
async def test_health_sensor_state_empty(
withings_factory: WithingsFactory, measure, expected
):
"""Test method."""
data = await withings_factory(
WithingsFactoryConfig(
measures=measure,
nokia_measures_response=NokiaMeasures({"measuregrps": []}),
nokia_sleep_response=NokiaSleep({"series": []}),
nokia_sleep_summary_response=NokiaSleepSummary({"series": []}),
)
)
profile = WithingsFactoryConfig.PROFILE_1
await data.configure_all(profile, "authorization_code")
# Checking initial data.
assert_state_equals(data.hass, profile, measure, expected)
# Encountering a throttled data.
await async_update_entity(data.hass, get_entity_id(measure, profile))
assert_state_equals(data.hass, profile, measure, expected)
SLEEP_STATES_TEST_DATA = [
(
const.STATE_AWAKE,
[const.MEASURE_TYPE_SLEEP_STATE_DEEP, const.MEASURE_TYPE_SLEEP_STATE_AWAKE],
),
(
const.STATE_LIGHT,
[const.MEASURE_TYPE_SLEEP_STATE_DEEP, const.MEASURE_TYPE_SLEEP_STATE_LIGHT],
),
(
const.STATE_REM,
[const.MEASURE_TYPE_SLEEP_STATE_DEEP, const.MEASURE_TYPE_SLEEP_STATE_REM],
),
(
const.STATE_DEEP,
[const.MEASURE_TYPE_SLEEP_STATE_LIGHT, const.MEASURE_TYPE_SLEEP_STATE_DEEP],
),
(const.STATE_UNKNOWN, [const.MEASURE_TYPE_SLEEP_STATE_LIGHT, "blah,"]),
]
@pytest.mark.parametrize("expected,sleep_states", SLEEP_STATES_TEST_DATA)
async def test_sleep_state_throttled(
withings_factory: WithingsFactory, expected, sleep_states
):
"""Test method."""
measure = const.MEAS_SLEEP_STATE
data = await withings_factory(
WithingsFactoryConfig(
measures=[measure], nokia_sleep_response=nokia_sleep_response(sleep_states)
)
)
profile = WithingsFactoryConfig.PROFILE_1
await data.configure_all(profile, "authorization_code")
# Check initial data.
assert_state_equals(data.hass, profile, measure, expected)
# Encountering a throttled data.
await async_update_entity(data.hass, get_entity_id(measure, profile))
assert_state_equals(data.hass, profile, measure, expected)
async def test_async_setup_check_credentials(
hass: HomeAssistantType, withings_factory: WithingsFactory
):
"""Test method."""
check_creds_patch = asynctest.patch(
"homeassistant.components.withings.common.WithingsDataManager"
".check_authenticated",
side_effect=NotAuthenticatedError(),
)
async_init_patch = asynctest.patch.object(
hass.config_entries.flow,
"async_init",
wraps=hass.config_entries.flow.async_init,
)
with check_creds_patch, async_init_patch as async_init_mock:
data = await withings_factory(
WithingsFactoryConfig(measures=[const.MEAS_HEIGHT_M])
)
profile = WithingsFactoryConfig.PROFILE_1
await data.configure_all(profile, "authorization_code")
async_init_mock.assert_called_with(
const.DOMAIN,
context={"source": SOURCE_USER, const.PROFILE: profile},
data={},
)
async def test_async_setup_entry_credentials_saver(hass: HomeAssistantType):
"""Test method."""
expected_creds = {
"access_token": "my_access_token2",
"refresh_token": "my_refresh_token2",
"token_type": "my_token_type2",
"expires_in": "2",
}
original_nokia_api = NokiaApi
nokia_api_instance = None
def new_nokia_api(*args, **kwargs):
nonlocal nokia_api_instance
nokia_api_instance = original_nokia_api(*args, **kwargs)
nokia_api_instance.request = MagicMock()
return nokia_api_instance
nokia_api_patch = patch("nokia.NokiaApi", side_effect=new_nokia_api)
session_patch = patch("requests_oauthlib.OAuth2Session")
client_patch = patch("oauthlib.oauth2.WebApplicationClient")
update_entry_patch = patch.object(
hass.config_entries,
"async_update_entry",
wraps=hass.config_entries.async_update_entry,
)
with session_patch, client_patch, nokia_api_patch, update_entry_patch:
async_add_entities = MagicMock()
hass.config_entries.async_update_entry = MagicMock()
config_entry = ConfigEntry(
version=1,
domain=const.DOMAIN,
title="my title",
data={
const.PROFILE: "Person 1",
const.CREDENTIALS: {
"access_token": "my_access_token",
"refresh_token": "my_refresh_token",
"token_type": "my_token_type",
"token_expiry": "9999999999",
},
},
source="source",
connection_class="conn_class",
system_options={},
)
await async_setup_entry(hass, config_entry, async_add_entities)
nokia_api_instance.set_token(expected_creds)
new_creds = config_entry.data[const.CREDENTIALS]
assert new_creds["access_token"] == "my_access_token2"