diff --git a/homeassistant/auth/const.py b/homeassistant/auth/const.py index 082d8966275..2e57986958c 100644 --- a/homeassistant/auth/const.py +++ b/homeassistant/auth/const.py @@ -2,3 +2,4 @@ from datetime import timedelta ACCESS_TOKEN_EXPIRATION = timedelta(minutes=30) +MFA_SESSION_EXPIRATION = timedelta(minutes=5) diff --git a/homeassistant/auth/mfa_modules/__init__.py b/homeassistant/auth/mfa_modules/__init__.py index 603ca6ff3b1..1746ef38f95 100644 --- a/homeassistant/auth/mfa_modules/__init__.py +++ b/homeassistant/auth/mfa_modules/__init__.py @@ -1,5 +1,4 @@ """Plugable auth modules for Home Assistant.""" -from datetime import timedelta import importlib import logging import types @@ -23,8 +22,6 @@ MULTI_FACTOR_AUTH_MODULE_SCHEMA = vol.Schema({ vol.Optional(CONF_ID): str, }, extra=vol.ALLOW_EXTRA) -SESSION_EXPIRATION = timedelta(minutes=5) - DATA_REQS = 'mfa_auth_module_reqs_processed' _LOGGER = logging.getLogger(__name__) @@ -34,6 +31,7 @@ class MultiFactorAuthModule: """Multi-factor Auth Module of validation function.""" DEFAULT_TITLE = 'Unnamed auth module' + MAX_RETRY_TIME = 3 def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None: """Initialize an auth module.""" @@ -84,7 +82,7 @@ class MultiFactorAuthModule: """Return whether user is setup.""" raise NotImplementedError - async def async_validation( + async def async_validate( self, user_id: str, user_input: Dict[str, Any]) -> bool: """Return True if validation passed.""" raise NotImplementedError diff --git a/homeassistant/auth/mfa_modules/insecure_example.py b/homeassistant/auth/mfa_modules/insecure_example.py index 9c72111ef96..9804cbcf635 100644 --- a/homeassistant/auth/mfa_modules/insecure_example.py +++ b/homeassistant/auth/mfa_modules/insecure_example.py @@ -77,7 +77,7 @@ class InsecureExampleModule(MultiFactorAuthModule): return True return False - async def async_validation( + async def async_validate( self, user_id: str, user_input: Dict[str, Any]) -> bool: """Return True if validation passed.""" for data in self._data: diff --git a/homeassistant/auth/mfa_modules/notify.py b/homeassistant/auth/mfa_modules/notify.py new file mode 100644 index 00000000000..84f9de614c1 --- /dev/null +++ b/homeassistant/auth/mfa_modules/notify.py @@ -0,0 +1,325 @@ +"""HMAC-based One-time Password auth module. + +Sending HOTP through notify service +""" +import logging +from collections import OrderedDict +from typing import Any, Dict, Optional, Tuple, List # noqa: F401 + +import attr +import voluptuous as vol + +from homeassistant.const import CONF_EXCLUDE, CONF_INCLUDE +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers import config_validation as cv + +from . import MultiFactorAuthModule, MULTI_FACTOR_AUTH_MODULES, \ + MULTI_FACTOR_AUTH_MODULE_SCHEMA, SetupFlow + +REQUIREMENTS = ['pyotp==2.2.6'] + +CONF_MESSAGE = 'message' + +CONFIG_SCHEMA = MULTI_FACTOR_AUTH_MODULE_SCHEMA.extend({ + vol.Optional(CONF_INCLUDE): vol.All(cv.ensure_list, [cv.string]), + vol.Optional(CONF_EXCLUDE): vol.All(cv.ensure_list, [cv.string]), + vol.Optional(CONF_MESSAGE, + default='{} is your Home Assistant login code'): str +}, extra=vol.PREVENT_EXTRA) + +STORAGE_VERSION = 1 +STORAGE_KEY = 'auth_module.notify' +STORAGE_USERS = 'users' +STORAGE_USER_ID = 'user_id' + +INPUT_FIELD_CODE = 'code' + +_LOGGER = logging.getLogger(__name__) + + +def _generate_secret() -> str: + """Generate a secret.""" + import pyotp + return str(pyotp.random_base32()) + + +def _generate_random() -> int: + """Generate a 8 digit number.""" + import pyotp + return int(pyotp.random_base32(length=8, chars=list('1234567890'))) + + +def _generate_otp(secret: str, count: int) -> str: + """Generate one time password.""" + import pyotp + return str(pyotp.HOTP(secret).at(count)) + + +def _verify_otp(secret: str, otp: str, count: int) -> bool: + """Verify one time password.""" + import pyotp + return bool(pyotp.HOTP(secret).verify(otp, count)) + + +@attr.s(slots=True) +class NotifySetting: + """Store notify setting for one user.""" + + secret = attr.ib(type=str, factory=_generate_secret) # not persistent + counter = attr.ib(type=int, factory=_generate_random) # not persistent + notify_service = attr.ib(type=Optional[str], default=None) + target = attr.ib(type=Optional[str], default=None) + + +_UsersDict = Dict[str, NotifySetting] + + +@MULTI_FACTOR_AUTH_MODULES.register('notify') +class NotifyAuthModule(MultiFactorAuthModule): + """Auth module send hmac-based one time password by notify service.""" + + DEFAULT_TITLE = 'Notify One-Time Password' + + def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None: + """Initialize the user data store.""" + super().__init__(hass, config) + self._user_settings = None # type: Optional[_UsersDict] + self._user_store = hass.helpers.storage.Store( + STORAGE_VERSION, STORAGE_KEY) + self._include = config.get(CONF_INCLUDE, []) + self._exclude = config.get(CONF_EXCLUDE, []) + self._message_template = config[CONF_MESSAGE] + + @property + def input_schema(self) -> vol.Schema: + """Validate login flow input data.""" + return vol.Schema({INPUT_FIELD_CODE: str}) + + async def _async_load(self) -> None: + """Load stored data.""" + data = await self._user_store.async_load() + + if data is None: + data = {STORAGE_USERS: {}} + + self._user_settings = { + user_id: NotifySetting(**setting) + for user_id, setting in data.get(STORAGE_USERS, {}).items() + } + + async def _async_save(self) -> None: + """Save data.""" + if self._user_settings is None: + return + + await self._user_store.async_save({STORAGE_USERS: { + user_id: attr.asdict( + notify_setting, filter=attr.filters.exclude( + attr.fields(NotifySetting).secret, + attr.fields(NotifySetting).counter, + )) + for user_id, notify_setting + in self._user_settings.items() + }}) + + @callback + def aync_get_available_notify_services(self) -> List[str]: + """Return list of notify services.""" + unordered_services = set() + + for service in self.hass.services.async_services().get('notify', {}): + if service not in self._exclude: + unordered_services.add(service) + + if self._include: + unordered_services &= set(self._include) + + return sorted(unordered_services) + + async def async_setup_flow(self, user_id: str) -> SetupFlow: + """Return a data entry flow handler for setup module. + + Mfa module should extend SetupFlow + """ + return NotifySetupFlow( + self, self.input_schema, user_id, + self.aync_get_available_notify_services()) + + async def async_setup_user(self, user_id: str, setup_data: Any) -> Any: + """Set up auth module for user.""" + if self._user_settings is None: + await self._async_load() + assert self._user_settings is not None + + self._user_settings[user_id] = NotifySetting( + notify_service=setup_data.get('notify_service'), + target=setup_data.get('target'), + ) + + await self._async_save() + + async def async_depose_user(self, user_id: str) -> None: + """Depose auth module for user.""" + if self._user_settings is None: + await self._async_load() + assert self._user_settings is not None + + if self._user_settings.pop(user_id, None): + await self._async_save() + + async def async_is_user_setup(self, user_id: str) -> bool: + """Return whether user is setup.""" + if self._user_settings is None: + await self._async_load() + assert self._user_settings is not None + + return user_id in self._user_settings + + async def async_validate( + self, user_id: str, user_input: Dict[str, Any]) -> bool: + """Return True if validation passed.""" + if self._user_settings is None: + await self._async_load() + assert self._user_settings is not None + + notify_setting = self._user_settings.get(user_id, None) + if notify_setting is None: + return False + + # user_input has been validate in caller + return await self.hass.async_add_executor_job( + _verify_otp, notify_setting.secret, + user_input.get(INPUT_FIELD_CODE, ''), + notify_setting.counter) + + async def async_initialize_login_mfa_step(self, user_id: str) -> None: + """Generate code and notify user.""" + if self._user_settings is None: + await self._async_load() + assert self._user_settings is not None + + notify_setting = self._user_settings.get(user_id, None) + if notify_setting is None: + raise ValueError('Cannot find user_id') + + def generate_secret_and_one_time_password() -> str: + """Generate and send one time password.""" + assert notify_setting + # secret and counter are not persistent + notify_setting.secret = _generate_secret() + notify_setting.counter = _generate_random() + return _generate_otp( + notify_setting.secret, notify_setting.counter) + + code = await self.hass.async_add_executor_job( + generate_secret_and_one_time_password) + + await self.async_notify_user(user_id, code) + + async def async_notify_user(self, user_id: str, code: str) -> None: + """Send code by user's notify service.""" + if self._user_settings is None: + await self._async_load() + assert self._user_settings is not None + + notify_setting = self._user_settings.get(user_id, None) + if notify_setting is None: + _LOGGER.error('Cannot find user %s', user_id) + return + + await self.async_notify( # type: ignore + code, notify_setting.notify_service, notify_setting.target) + + async def async_notify(self, code: str, notify_service: str, + target: Optional[str] = None) -> None: + """Send code by notify service.""" + data = {'message': self._message_template.format(code)} + if target: + data['target'] = [target] + + await self.hass.services.async_call('notify', notify_service, data) + + +class NotifySetupFlow(SetupFlow): + """Handler for the setup flow.""" + + def __init__(self, auth_module: NotifyAuthModule, + setup_schema: vol.Schema, + user_id: str, + available_notify_services: List[str]) -> None: + """Initialize the setup flow.""" + super().__init__(auth_module, setup_schema, user_id) + # to fix typing complaint + self._auth_module = auth_module # type: NotifyAuthModule + self._available_notify_services = available_notify_services + self._secret = None # type: Optional[str] + self._count = None # type: Optional[int] + self._notify_service = None # type: Optional[str] + self._target = None # type: Optional[str] + + async def async_step_init( + self, user_input: Optional[Dict[str, str]] = None) \ + -> Dict[str, Any]: + """Let user select available notify services.""" + errors = {} # type: Dict[str, str] + + hass = self._auth_module.hass + if user_input: + self._notify_service = user_input['notify_service'] + self._target = user_input.get('target') + self._secret = await hass.async_add_executor_job(_generate_secret) + self._count = await hass.async_add_executor_job(_generate_random) + + return await self.async_step_setup() + + if not self._available_notify_services: + return self.async_abort(reason='no_available_service') + + schema = OrderedDict() # type: Dict[str, Any] + schema['notify_service'] = vol.In(self._available_notify_services) + schema['target'] = vol.Optional(str) + + return self.async_show_form( + step_id='init', + data_schema=vol.Schema(schema), + errors=errors + ) + + async def async_step_setup( + self, user_input: Optional[Dict[str, str]] = None) \ + -> Dict[str, Any]: + """Verify user can recevie one-time password.""" + errors = {} # type: Dict[str, str] + + hass = self._auth_module.hass + if user_input: + verified = await hass.async_add_executor_job( + _verify_otp, self._secret, user_input['code'], self._count) + if verified: + await self._auth_module.async_setup_user( + self._user_id, { + 'notify_service': self._notify_service, + 'target': self._target, + }) + return self.async_create_entry( + title=self._auth_module.name, + data={} + ) + + errors['base'] = 'invalid_code' + + # generate code every time, no retry logic + assert self._secret and self._count + code = await hass.async_add_executor_job( + _generate_otp, self._secret, self._count) + + assert self._notify_service + await self._auth_module.async_notify( + code, self._notify_service, self._target) + + return self.async_show_form( + step_id='setup', + data_schema=self._setup_schema, + description_placeholders={'notify_service': self._notify_service}, + errors=errors, + ) diff --git a/homeassistant/auth/mfa_modules/totp.py b/homeassistant/auth/mfa_modules/totp.py index 50cd9d33466..625cc0302e1 100644 --- a/homeassistant/auth/mfa_modules/totp.py +++ b/homeassistant/auth/mfa_modules/totp.py @@ -60,6 +60,7 @@ class TotpAuthModule(MultiFactorAuthModule): """Auth module validate time-based one time password.""" DEFAULT_TITLE = 'Time-based One Time Password' + MAX_RETRY_TIME = 5 def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None: """Initialize the user data store.""" @@ -130,7 +131,7 @@ class TotpAuthModule(MultiFactorAuthModule): return user_id in self._users # type: ignore - async def async_validation( + async def async_validate( self, user_id: str, user_input: Dict[str, Any]) -> bool: """Return True if validation passed.""" if self._users is None: diff --git a/homeassistant/auth/providers/__init__.py b/homeassistant/auth/providers/__init__.py index 3cb1c6b121e..e96f6d7ebba 100644 --- a/homeassistant/auth/providers/__init__.py +++ b/homeassistant/auth/providers/__init__.py @@ -15,8 +15,8 @@ from homeassistant.util import dt as dt_util from homeassistant.util.decorator import Registry from ..auth_store import AuthStore +from ..const import MFA_SESSION_EXPIRATION from ..models import Credentials, User, UserMeta # noqa: F401 -from ..mfa_modules import SESSION_EXPIRATION _LOGGER = logging.getLogger(__name__) DATA_REQS = 'auth_prov_reqs_processed' @@ -171,6 +171,7 @@ class LoginFlow(data_entry_flow.FlowHandler): self._auth_manager = auth_provider.hass.auth # type: ignore self.available_mfa_modules = {} # type: Dict[str, str] self.created_at = dt_util.utcnow() + self.invalid_mfa_times = 0 self.user = None # type: Optional[User] async def async_step_init( @@ -212,6 +213,8 @@ class LoginFlow(data_entry_flow.FlowHandler): self, user_input: Optional[Dict[str, str]] = None) \ -> Dict[str, Any]: """Handle the step of mfa validation.""" + assert self.user + errors = {} auth_module = self._auth_manager.get_auth_mfa_module( @@ -221,25 +224,34 @@ class LoginFlow(data_entry_flow.FlowHandler): # will show invalid_auth_module error return await self.async_step_select_mfa_module(user_input={}) + if user_input is None and hasattr(auth_module, + 'async_initialize_login_mfa_step'): + await auth_module.async_initialize_login_mfa_step(self.user.id) + if user_input is not None: - expires = self.created_at + SESSION_EXPIRATION + expires = self.created_at + MFA_SESSION_EXPIRATION if dt_util.utcnow() > expires: return self.async_abort( reason='login_expired' ) - result = await auth_module.async_validation( - self.user.id, user_input) # type: ignore + result = await auth_module.async_validate( + self.user.id, user_input) if not result: errors['base'] = 'invalid_code' + self.invalid_mfa_times += 1 + if self.invalid_mfa_times >= auth_module.MAX_RETRY_TIME > 0: + return self.async_abort( + reason='too_many_retry' + ) if not errors: return await self.async_finish(self.user) description_placeholders = { 'mfa_module_name': auth_module.name, - 'mfa_module_id': auth_module.id - } # type: Dict[str, str] + 'mfa_module_id': auth_module.id, + } # type: Dict[str, Optional[str]] return self.async_show_form( step_id='mfa', diff --git a/homeassistant/components/auth/.translations/en.json b/homeassistant/components/auth/.translations/en.json index a0fd20e9d08..21cb45e3050 100644 --- a/homeassistant/components/auth/.translations/en.json +++ b/homeassistant/components/auth/.translations/en.json @@ -1,5 +1,24 @@ { "mfa_setup": { + "notify": { + "abort": { + "no_available_service": "No available notify services." + }, + "error": { + "invalid_code": "Invalid code, please try again." + }, + "step": { + "init": { + "description": "Please select one of notify service:", + "title": "Set up one-time password delivered by notify component" + }, + "setup": { + "description": "A one-time password have sent by **notify.{notify_service}**. Please input it in below:", + "title": "Verify setup" + } + }, + "title": "Notify One-Time Password" + }, "totp": { "error": { "invalid_code": "Invalid code, please try again. If you get this error consistently, please make sure the clock of your Home Assistant system is accurate." diff --git a/homeassistant/components/auth/login_flow.py b/homeassistant/components/auth/login_flow.py index 73a739c2960..3a51cf8066f 100644 --- a/homeassistant/components/auth/login_flow.py +++ b/homeassistant/components/auth/login_flow.py @@ -226,8 +226,9 @@ class LoginFlowResourceView(HomeAssistantView): if result['type'] != data_entry_flow.RESULT_TYPE_CREATE_ENTRY: # @log_invalid_auth does not work here since it returns HTTP 200 # need manually log failed login attempts - if result['errors'] is not None and \ - result['errors'].get('base') == 'invalid_auth': + if (result.get('errors') is not None and + result['errors'].get('base') in ['invalid_auth', + 'invalid_code']): await process_wrong_login(request) return self.json(_prepare_result_json(result)) diff --git a/homeassistant/components/auth/strings.json b/homeassistant/components/auth/strings.json index b0083ab577b..2b1fc0c94f6 100644 --- a/homeassistant/components/auth/strings.json +++ b/homeassistant/components/auth/strings.json @@ -11,6 +11,25 @@ "error": { "invalid_code": "Invalid code, please try again. If you get this error consistently, please make sure the clock of your Home Assistant system is accurate." } + }, + "notify": { + "title": "Notify One-Time Password", + "step": { + "init": { + "title": "Set up one-time password delivered by notify component", + "description": "Please select one of notify service:" + }, + "setup": { + "title": "Verify setup", + "description": "A one-time password have sent by **notify.{notify_service}**. Please input it in below:" + } + }, + "abort": { + "no_available_service": "No available notify services." + }, + "error": { + "invalid_code": "Invalid code, please try again." + } } } } diff --git a/homeassistant/config.py b/homeassistant/config.py index abcf027c49f..98857d8a83d 100644 --- a/homeassistant/config.py +++ b/homeassistant/config.py @@ -475,7 +475,7 @@ async def async_process_ha_core_config( auth_conf.append({'type': 'trusted_networks'}) mfa_conf = config.get(CONF_AUTH_MFA_MODULES, [ - {'type': 'totp', 'id': 'totp', 'name': 'Authenticator app'} + {'type': 'totp', 'id': 'totp', 'name': 'Authenticator app'}, ]) setattr(hass, 'auth', await auth.auth_manager_from_config( diff --git a/requirements_all.txt b/requirements_all.txt index ba353af0d48..70c05090fb0 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -1020,6 +1020,7 @@ pyota==2.0.5 # homeassistant.components.climate.opentherm_gw pyotgw==0.1b0 +# homeassistant.auth.mfa_modules.notify # homeassistant.auth.mfa_modules.totp # homeassistant.components.sensor.otp pyotp==2.2.6 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 171650b867d..133b97c2687 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -160,6 +160,7 @@ pynx584==0.4 # homeassistant.components.openuv pyopenuv==1.0.4 +# homeassistant.auth.mfa_modules.notify # homeassistant.auth.mfa_modules.totp # homeassistant.components.sensor.otp pyotp==2.2.6 diff --git a/tests/auth/mfa_modules/test_insecure_example.py b/tests/auth/mfa_modules/test_insecure_example.py index 80109627140..d9330d5f6e8 100644 --- a/tests/auth/mfa_modules/test_insecure_example.py +++ b/tests/auth/mfa_modules/test_insecure_example.py @@ -12,15 +12,15 @@ async def test_validate(hass): 'data': [{'user_id': 'test-user', 'pin': '123456'}] }) - result = await auth_module.async_validation( + result = await auth_module.async_validate( 'test-user', {'pin': '123456'}) assert result is True - result = await auth_module.async_validation( + result = await auth_module.async_validate( 'test-user', {'pin': 'invalid'}) assert result is False - result = await auth_module.async_validation( + result = await auth_module.async_validate( 'invalid-user', {'pin': '123456'}) assert result is False @@ -36,7 +36,7 @@ async def test_setup_user(hass): 'test-user', {'pin': '123456'}) assert len(auth_module._data) == 1 - result = await auth_module.async_validation( + result = await auth_module.async_validate( 'test-user', {'pin': '123456'}) assert result is True diff --git a/tests/auth/mfa_modules/test_notify.py b/tests/auth/mfa_modules/test_notify.py new file mode 100644 index 00000000000..ffe0b103fc9 --- /dev/null +++ b/tests/auth/mfa_modules/test_notify.py @@ -0,0 +1,397 @@ +"""Test the HMAC-based One Time Password (MFA) auth module.""" +from unittest.mock import patch + +from homeassistant import data_entry_flow +from homeassistant.auth import models as auth_models, auth_manager_from_config +from homeassistant.auth.mfa_modules import auth_mfa_module_from_config +from homeassistant.components.notify import NOTIFY_SERVICE_SCHEMA +from tests.common import MockUser, async_mock_service + +MOCK_CODE = '123456' +MOCK_CODE_2 = '654321' + + +async def test_validating_mfa(hass): + """Test validating mfa code.""" + notify_auth_module = await auth_mfa_module_from_config(hass, { + 'type': 'notify' + }) + await notify_auth_module.async_setup_user('test-user', { + 'notify_service': 'dummy' + }) + + with patch('pyotp.HOTP.verify', return_value=True): + assert await notify_auth_module.async_validate( + 'test-user', {'code': MOCK_CODE}) + + +async def test_validating_mfa_invalid_code(hass): + """Test validating an invalid mfa code.""" + notify_auth_module = await auth_mfa_module_from_config(hass, { + 'type': 'notify' + }) + await notify_auth_module.async_setup_user('test-user', { + 'notify_service': 'dummy' + }) + + with patch('pyotp.HOTP.verify', return_value=False): + assert await notify_auth_module.async_validate( + 'test-user', {'code': MOCK_CODE}) is False + + +async def test_validating_mfa_invalid_user(hass): + """Test validating an mfa code with invalid user.""" + notify_auth_module = await auth_mfa_module_from_config(hass, { + 'type': 'notify' + }) + await notify_auth_module.async_setup_user('test-user', { + 'notify_service': 'dummy' + }) + + assert await notify_auth_module.async_validate( + 'invalid-user', {'code': MOCK_CODE}) is False + + +async def test_validating_mfa_counter(hass): + """Test counter will move only after generate code.""" + notify_auth_module = await auth_mfa_module_from_config(hass, { + 'type': 'notify' + }) + await notify_auth_module.async_setup_user('test-user', { + 'counter': 0, + 'notify_service': 'dummy', + }) + + assert notify_auth_module._user_settings + notify_setting = list(notify_auth_module._user_settings.values())[0] + init_count = notify_setting.counter + assert init_count is not None + + with patch('pyotp.HOTP.at', return_value=MOCK_CODE): + await notify_auth_module.async_initialize_login_mfa_step('test-user') + + notify_setting = list(notify_auth_module._user_settings.values())[0] + after_generate_count = notify_setting.counter + assert after_generate_count != init_count + + with patch('pyotp.HOTP.verify', return_value=True): + assert await notify_auth_module.async_validate( + 'test-user', {'code': MOCK_CODE}) + + notify_setting = list(notify_auth_module._user_settings.values())[0] + assert after_generate_count == notify_setting.counter + + with patch('pyotp.HOTP.verify', return_value=False): + assert await notify_auth_module.async_validate( + 'test-user', {'code': MOCK_CODE}) is False + + notify_setting = list(notify_auth_module._user_settings.values())[0] + assert after_generate_count == notify_setting.counter + + +async def test_setup_depose_user(hass): + """Test set up and despose user.""" + notify_auth_module = await auth_mfa_module_from_config(hass, { + 'type': 'notify' + }) + await notify_auth_module.async_setup_user('test-user', {}) + assert len(notify_auth_module._user_settings) == 1 + await notify_auth_module.async_setup_user('test-user', {}) + assert len(notify_auth_module._user_settings) == 1 + + await notify_auth_module.async_depose_user('test-user') + assert len(notify_auth_module._user_settings) == 0 + + await notify_auth_module.async_setup_user( + 'test-user2', {'secret': 'secret-code'}) + assert len(notify_auth_module._user_settings) == 1 + + +async def test_login_flow_validates_mfa(hass): + """Test login flow with mfa enabled.""" + hass.auth = await auth_manager_from_config(hass, [{ + 'type': 'insecure_example', + 'users': [{'username': 'test-user', 'password': 'test-pass'}], + }], [{ + 'type': 'notify', + }]) + user = MockUser( + id='mock-user', + is_owner=False, + is_active=False, + name='Paulus', + ).add_to_auth_manager(hass.auth) + await hass.auth.async_link_user(user, auth_models.Credentials( + id='mock-id', + auth_provider_type='insecure_example', + auth_provider_id=None, + data={'username': 'test-user'}, + is_new=False, + )) + + notify_calls = async_mock_service(hass, 'notify', 'test-notify', + NOTIFY_SERVICE_SCHEMA) + + await hass.auth.async_enable_user_mfa(user, 'notify', { + 'notify_service': 'test-notify', + }) + + provider = hass.auth.auth_providers[0] + + result = await hass.auth.login_flow.async_init( + (provider.type, provider.id)) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + + result = await hass.auth.login_flow.async_configure(result['flow_id'], { + 'username': 'incorrect-user', + 'password': 'test-pass', + }) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + assert result['errors']['base'] == 'invalid_auth' + + result = await hass.auth.login_flow.async_configure(result['flow_id'], { + 'username': 'test-user', + 'password': 'incorrect-pass', + }) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + assert result['errors']['base'] == 'invalid_auth' + + with patch('pyotp.HOTP.at', return_value=MOCK_CODE): + result = await hass.auth.login_flow.async_configure( + result['flow_id'], + { + 'username': 'test-user', + 'password': 'test-pass', + }) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + assert result['step_id'] == 'mfa' + assert result['data_schema'].schema.get('code') == str + + # wait service call finished + await hass.async_block_till_done() + + assert len(notify_calls) == 1 + notify_call = notify_calls[0] + assert notify_call.domain == 'notify' + assert notify_call.service == 'test-notify' + message = notify_call.data['message'] + message.hass = hass + assert MOCK_CODE in message.async_render() + + with patch('pyotp.HOTP.verify', return_value=False): + result = await hass.auth.login_flow.async_configure( + result['flow_id'], {'code': 'invalid-code'}) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + assert result['step_id'] == 'mfa' + assert result['errors']['base'] == 'invalid_code' + + # wait service call finished + await hass.async_block_till_done() + + # would not send new code, allow user retry + assert len(notify_calls) == 1 + + # retry twice + with patch('pyotp.HOTP.verify', return_value=False), \ + patch('pyotp.HOTP.at', return_value=MOCK_CODE_2): + result = await hass.auth.login_flow.async_configure( + result['flow_id'], {'code': 'invalid-code'}) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + assert result['step_id'] == 'mfa' + assert result['errors']['base'] == 'invalid_code' + + # after the 3rd failure, flow abort + result = await hass.auth.login_flow.async_configure( + result['flow_id'], {'code': 'invalid-code'}) + assert result['type'] == data_entry_flow.RESULT_TYPE_ABORT + assert result['reason'] == 'too_many_retry' + + # wait service call finished + await hass.async_block_till_done() + + # restart login + result = await hass.auth.login_flow.async_init( + (provider.type, provider.id)) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + + with patch('pyotp.HOTP.at', return_value=MOCK_CODE): + result = await hass.auth.login_flow.async_configure( + result['flow_id'], + { + 'username': 'test-user', + 'password': 'test-pass', + }) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + assert result['step_id'] == 'mfa' + assert result['data_schema'].schema.get('code') == str + + # wait service call finished + await hass.async_block_till_done() + + assert len(notify_calls) == 2 + notify_call = notify_calls[1] + assert notify_call.domain == 'notify' + assert notify_call.service == 'test-notify' + message = notify_call.data['message'] + message.hass = hass + assert MOCK_CODE in message.async_render() + + with patch('pyotp.HOTP.verify', return_value=True): + result = await hass.auth.login_flow.async_configure( + result['flow_id'], {'code': MOCK_CODE}) + assert result['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result['data'].id == 'mock-user' + + +async def test_setup_user_notify_service(hass): + """Test allow select notify service during mfa setup.""" + notify_calls = async_mock_service( + hass, 'notify', 'test1', NOTIFY_SERVICE_SCHEMA) + async_mock_service(hass, 'notify', 'test2', NOTIFY_SERVICE_SCHEMA) + notify_auth_module = await auth_mfa_module_from_config(hass, { + 'type': 'notify', + }) + + services = notify_auth_module.aync_get_available_notify_services() + assert services == ['test1', 'test2'] + + flow = await notify_auth_module.async_setup_flow('test-user') + step = await flow.async_step_init() + assert step['type'] == data_entry_flow.RESULT_TYPE_FORM + assert step['step_id'] == 'init' + schema = step['data_schema'] + schema({'notify_service': 'test2'}) + + with patch('pyotp.HOTP.at', return_value=MOCK_CODE): + step = await flow.async_step_init({'notify_service': 'test1'}) + assert step['type'] == data_entry_flow.RESULT_TYPE_FORM + assert step['step_id'] == 'setup' + + # wait service call finished + await hass.async_block_till_done() + + assert len(notify_calls) == 1 + notify_call = notify_calls[0] + assert notify_call.domain == 'notify' + assert notify_call.service == 'test1' + message = notify_call.data['message'] + message.hass = hass + assert MOCK_CODE in message.async_render() + + with patch('pyotp.HOTP.at', return_value=MOCK_CODE_2): + step = await flow.async_step_setup({'code': 'invalid'}) + assert step['type'] == data_entry_flow.RESULT_TYPE_FORM + assert step['step_id'] == 'setup' + assert step['errors']['base'] == 'invalid_code' + + # wait service call finished + await hass.async_block_till_done() + + assert len(notify_calls) == 2 + notify_call = notify_calls[1] + assert notify_call.domain == 'notify' + assert notify_call.service == 'test1' + message = notify_call.data['message'] + message.hass = hass + assert MOCK_CODE_2 in message.async_render() + + with patch('pyotp.HOTP.verify', return_value=True): + step = await flow.async_step_setup({'code': MOCK_CODE_2}) + assert step['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + + +async def test_include_exclude_config(hass): + """Test allow include exclude config.""" + async_mock_service(hass, 'notify', 'include1', NOTIFY_SERVICE_SCHEMA) + async_mock_service(hass, 'notify', 'include2', NOTIFY_SERVICE_SCHEMA) + async_mock_service(hass, 'notify', 'exclude1', NOTIFY_SERVICE_SCHEMA) + async_mock_service(hass, 'notify', 'exclude2', NOTIFY_SERVICE_SCHEMA) + async_mock_service(hass, 'other', 'include3', NOTIFY_SERVICE_SCHEMA) + async_mock_service(hass, 'other', 'exclude3', NOTIFY_SERVICE_SCHEMA) + + notify_auth_module = await auth_mfa_module_from_config(hass, { + 'type': 'notify', + 'exclude': ['exclude1', 'exclude2', 'exclude3'], + }) + services = notify_auth_module.aync_get_available_notify_services() + assert services == ['include1', 'include2'] + + notify_auth_module = await auth_mfa_module_from_config(hass, { + 'type': 'notify', + 'include': ['include1', 'include2', 'include3'], + }) + services = notify_auth_module.aync_get_available_notify_services() + assert services == ['include1', 'include2'] + + # exclude has high priority than include + notify_auth_module = await auth_mfa_module_from_config(hass, { + 'type': 'notify', + 'include': ['include1', 'include2', 'include3'], + 'exclude': ['exclude1', 'exclude2', 'include2'], + }) + services = notify_auth_module.aync_get_available_notify_services() + assert services == ['include1'] + + +async def test_setup_user_no_notify_service(hass): + """Test setup flow abort if there is no avilable notify service.""" + async_mock_service(hass, 'notify', 'test1', NOTIFY_SERVICE_SCHEMA) + notify_auth_module = await auth_mfa_module_from_config(hass, { + 'type': 'notify', + 'exclude': 'test1', + }) + + services = notify_auth_module.aync_get_available_notify_services() + assert services == [] + + flow = await notify_auth_module.async_setup_flow('test-user') + step = await flow.async_step_init() + assert step['type'] == data_entry_flow.RESULT_TYPE_ABORT + assert step['reason'] == 'no_available_service' + + +async def test_not_raise_exception_when_service_not_exist(hass): + """Test login flow will not raise exception when notify service error.""" + hass.auth = await auth_manager_from_config(hass, [{ + 'type': 'insecure_example', + 'users': [{'username': 'test-user', 'password': 'test-pass'}], + }], [{ + 'type': 'notify', + }]) + user = MockUser( + id='mock-user', + is_owner=False, + is_active=False, + name='Paulus', + ).add_to_auth_manager(hass.auth) + await hass.auth.async_link_user(user, auth_models.Credentials( + id='mock-id', + auth_provider_type='insecure_example', + auth_provider_id=None, + data={'username': 'test-user'}, + is_new=False, + )) + + await hass.auth.async_enable_user_mfa(user, 'notify', { + 'notify_service': 'invalid-notify', + }) + + provider = hass.auth.auth_providers[0] + + result = await hass.auth.login_flow.async_init( + (provider.type, provider.id)) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + + with patch('pyotp.HOTP.at', return_value=MOCK_CODE): + result = await hass.auth.login_flow.async_configure( + result['flow_id'], + { + 'username': 'test-user', + 'password': 'test-pass', + }) + assert result['type'] == data_entry_flow.RESULT_TYPE_FORM + assert result['step_id'] == 'mfa' + assert result['data_schema'].schema.get('code') == str + + # wait service call finished + await hass.async_block_till_done() diff --git a/tests/auth/mfa_modules/test_totp.py b/tests/auth/mfa_modules/test_totp.py index 6e3558ec549..d400fe80672 100644 --- a/tests/auth/mfa_modules/test_totp.py +++ b/tests/auth/mfa_modules/test_totp.py @@ -17,7 +17,7 @@ async def test_validating_mfa(hass): await totp_auth_module.async_setup_user('test-user', {}) with patch('pyotp.TOTP.verify', return_value=True): - assert await totp_auth_module.async_validation( + assert await totp_auth_module.async_validate( 'test-user', {'code': MOCK_CODE}) @@ -29,7 +29,7 @@ async def test_validating_mfa_invalid_code(hass): await totp_auth_module.async_setup_user('test-user', {}) with patch('pyotp.TOTP.verify', return_value=False): - assert await totp_auth_module.async_validation( + assert await totp_auth_module.async_validate( 'test-user', {'code': MOCK_CODE}) is False @@ -40,7 +40,7 @@ async def test_validating_mfa_invalid_user(hass): }) await totp_auth_module.async_setup_user('test-user', {}) - assert await totp_auth_module.async_validation( + assert await totp_auth_module.async_validate( 'invalid-user', {'code': MOCK_CODE}) is False diff --git a/tests/auth/test_init.py b/tests/auth/test_init.py index 8325bd2551a..8fd9b8930e4 100644 --- a/tests/auth/test_init.py +++ b/tests/auth/test_init.py @@ -9,7 +9,7 @@ import voluptuous as vol from homeassistant import auth, data_entry_flow from homeassistant.auth import ( models as auth_models, auth_store, const as auth_const) -from homeassistant.auth.mfa_modules import SESSION_EXPIRATION +from homeassistant.auth.const import MFA_SESSION_EXPIRATION from homeassistant.util import dt as dt_util from tests.common import ( MockUser, ensure_auth_manager_loaded, flush_store, CLIENT_ID) @@ -720,7 +720,7 @@ async def test_auth_module_expired_session(mock_hass): assert step['step_id'] == 'mfa' with patch('homeassistant.util.dt.utcnow', - return_value=dt_util.utcnow() + SESSION_EXPIRATION): + return_value=dt_util.utcnow() + MFA_SESSION_EXPIRATION): step = await manager.login_flow.async_configure(step['flow_id'], { 'pin': 'test-pin', })