Long-lived access token (#16453)

* Allow create refresh_token with specific access_token_expiration

* Add token_type, client_name and client_icon

* Add unit test

* Add websocket API to create long-lived access token

* Allow URL use as client_id for long-lived access token

* Remove mutate_refresh_token method

* Use client name as id for long_lived_access_token type refresh token

* Minor change

* Do not allow duplicate client name

* Update docstring

* Remove unnecessary `list`
This commit is contained in:
Jason Hu 2018-09-11 03:05:15 -07:00 committed by Paulus Schoutsen
parent 50fb59477a
commit 9583947012
7 changed files with 385 additions and 16 deletions

View File

@ -2,11 +2,13 @@
import asyncio import asyncio
import logging import logging
from collections import OrderedDict from collections import OrderedDict
from datetime import timedelta
from typing import Any, Dict, List, Optional, Tuple, cast from typing import Any, Dict, List, Optional, Tuple, cast
import jwt import jwt
from homeassistant import data_entry_flow from homeassistant import data_entry_flow
from homeassistant.auth.const import ACCESS_TOKEN_EXPIRATION
from homeassistant.core import callback, HomeAssistant from homeassistant.core import callback, HomeAssistant
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util
@ -242,8 +244,12 @@ class AuthManager:
modules[module_id] = module.name modules[module_id] = module.name
return modules return modules
async def async_create_refresh_token(self, user: models.User, async def async_create_refresh_token(
client_id: Optional[str] = None) \ self, user: models.User, client_id: Optional[str] = None,
client_name: Optional[str] = None,
client_icon: Optional[str] = None,
token_type: Optional[str] = None,
access_token_expiration: timedelta = ACCESS_TOKEN_EXPIRATION) \
-> models.RefreshToken: -> models.RefreshToken:
"""Create a new refresh token for a user.""" """Create a new refresh token for a user."""
if not user.is_active: if not user.is_active:
@ -254,10 +260,36 @@ class AuthManager:
'System generated users cannot have refresh tokens connected ' 'System generated users cannot have refresh tokens connected '
'to a client.') 'to a client.')
if not user.system_generated and client_id is None: if token_type is None:
if user.system_generated:
token_type = models.TOKEN_TYPE_SYSTEM
else:
token_type = models.TOKEN_TYPE_NORMAL
if user.system_generated != (token_type == models.TOKEN_TYPE_SYSTEM):
raise ValueError(
'System generated users can only have system type '
'refresh tokens')
if token_type == models.TOKEN_TYPE_NORMAL and client_id is None:
raise ValueError('Client is required to generate a refresh token.') raise ValueError('Client is required to generate a refresh token.')
return await self._store.async_create_refresh_token(user, client_id) if (token_type == models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN and
client_name is None):
raise ValueError('Client_name is required for long-lived access '
'token')
if token_type == models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN:
for token in user.refresh_tokens.values():
if (token.client_name == client_name and token.token_type ==
models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN):
# Each client_name can only have one
# long_lived_access_token type of refresh token
raise ValueError('{} already exists'.format(client_name))
return await self._store.async_create_refresh_token(
user, client_id, client_name, client_icon,
token_type, access_token_expiration)
async def async_get_refresh_token( async def async_get_refresh_token(
self, token_id: str) -> Optional[models.RefreshToken]: self, token_id: str) -> Optional[models.RefreshToken]:
@ -280,10 +312,11 @@ class AuthManager:
refresh_token: models.RefreshToken) -> str: refresh_token: models.RefreshToken) -> str:
"""Create a new access token.""" """Create a new access token."""
# pylint: disable=no-self-use # pylint: disable=no-self-use
now = dt_util.utcnow()
return jwt.encode({ return jwt.encode({
'iss': refresh_token.id, 'iss': refresh_token.id,
'iat': dt_util.utcnow(), 'iat': now,
'exp': dt_util.utcnow() + refresh_token.access_token_expiration, 'exp': now + refresh_token.access_token_expiration,
}, refresh_token.jwt_key, algorithm='HS256').decode() }, refresh_token.jwt_key, algorithm='HS256').decode()
async def async_validate_access_token( async def async_validate_access_token(

View File

@ -5,6 +5,7 @@ from logging import getLogger
from typing import Any, Dict, List, Optional # noqa: F401 from typing import Any, Dict, List, Optional # noqa: F401
import hmac import hmac
from homeassistant.auth.const import ACCESS_TOKEN_EXPIRATION
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util
@ -128,11 +129,27 @@ class AuthStore:
self._async_schedule_save() self._async_schedule_save()
async def async_create_refresh_token( async def async_create_refresh_token(
self, user: models.User, client_id: Optional[str] = None) \ self, user: models.User, client_id: Optional[str] = None,
client_name: Optional[str] = None,
client_icon: Optional[str] = None,
token_type: str = models.TOKEN_TYPE_NORMAL,
access_token_expiration: timedelta = ACCESS_TOKEN_EXPIRATION) \
-> models.RefreshToken: -> models.RefreshToken:
"""Create a new token for a user.""" """Create a new token for a user."""
refresh_token = models.RefreshToken(user=user, client_id=client_id) kwargs = {
'user': user,
'client_id': client_id,
'token_type': token_type,
'access_token_expiration': access_token_expiration
} # type: Dict[str, Any]
if client_name:
kwargs['client_name'] = client_name
if client_icon:
kwargs['client_icon'] = client_icon
refresh_token = models.RefreshToken(**kwargs)
user.refresh_tokens[refresh_token.id] = refresh_token user.refresh_tokens[refresh_token.id] = refresh_token
self._async_schedule_save() self._async_schedule_save()
return refresh_token return refresh_token
@ -216,10 +233,20 @@ class AuthStore:
'Ignoring refresh token %(id)s with invalid created_at ' 'Ignoring refresh token %(id)s with invalid created_at '
'%(created_at)s for user_id %(user_id)s', rt_dict) '%(created_at)s for user_id %(user_id)s', rt_dict)
continue continue
token_type = rt_dict.get('token_type')
if token_type is None:
if rt_dict['clinet_id'] is None:
token_type = models.TOKEN_TYPE_SYSTEM
else:
token_type = models.TOKEN_TYPE_NORMAL
token = models.RefreshToken( token = models.RefreshToken(
id=rt_dict['id'], id=rt_dict['id'],
user=users[rt_dict['user_id']], user=users[rt_dict['user_id']],
client_id=rt_dict['client_id'], client_id=rt_dict['client_id'],
# use dict.get to keep backward compatibility
client_name=rt_dict.get('client_name'),
client_icon=rt_dict.get('client_icon'),
token_type=token_type,
created_at=created_at, created_at=created_at,
access_token_expiration=timedelta( access_token_expiration=timedelta(
seconds=rt_dict['access_token_expiration']), seconds=rt_dict['access_token_expiration']),
@ -271,6 +298,9 @@ class AuthStore:
'id': refresh_token.id, 'id': refresh_token.id,
'user_id': user.id, 'user_id': user.id,
'client_id': refresh_token.client_id, 'client_id': refresh_token.client_id,
'client_name': refresh_token.client_name,
'client_icon': refresh_token.client_icon,
'token_type': refresh_token.token_type,
'created_at': refresh_token.created_at.isoformat(), 'created_at': refresh_token.created_at.isoformat(),
'access_token_expiration': 'access_token_expiration':
refresh_token.access_token_expiration.total_seconds(), refresh_token.access_token_expiration.total_seconds(),

View File

@ -7,9 +7,12 @@ import attr
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util
from .const import ACCESS_TOKEN_EXPIRATION
from .util import generate_secret from .util import generate_secret
TOKEN_TYPE_NORMAL = 'normal'
TOKEN_TYPE_SYSTEM = 'system'
TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN = 'long_lived_access_token'
@attr.s(slots=True) @attr.s(slots=True)
class User: class User:
@ -37,11 +40,16 @@ class RefreshToken:
"""RefreshToken for a user to grant new access tokens.""" """RefreshToken for a user to grant new access tokens."""
user = attr.ib(type=User) user = attr.ib(type=User)
client_id = attr.ib(type=str) # type: Optional[str] client_id = attr.ib(type=Optional[str])
access_token_expiration = attr.ib(type=timedelta)
client_name = attr.ib(type=Optional[str], default=None)
client_icon = attr.ib(type=Optional[str], default=None)
token_type = attr.ib(type=str, default=TOKEN_TYPE_NORMAL,
validator=attr.validators.in_((
TOKEN_TYPE_NORMAL, TOKEN_TYPE_SYSTEM,
TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN)))
id = attr.ib(type=str, default=attr.Factory(lambda: uuid.uuid4().hex)) id = attr.ib(type=str, default=attr.Factory(lambda: uuid.uuid4().hex))
created_at = attr.ib(type=datetime, default=attr.Factory(dt_util.utcnow)) created_at = attr.ib(type=datetime, default=attr.Factory(dt_util.utcnow))
access_token_expiration = attr.ib(type=timedelta,
default=ACCESS_TOKEN_EXPIRATION)
token = attr.ib(type=str, token = attr.ib(type=str,
default=attr.Factory(lambda: generate_secret(64))) default=attr.Factory(lambda: generate_secret(64)))
jwt_key = attr.ib(type=str, jwt_key = attr.ib(type=str,

View File

@ -12,6 +12,7 @@ be in JSON as it's more readable.
Exchange the authorization code retrieved from the login flow for tokens. Exchange the authorization code retrieved from the login flow for tokens.
{ {
"client_id": "https://hassbian.local:8123/",
"grant_type": "authorization_code", "grant_type": "authorization_code",
"code": "411ee2f916e648d691e937ae9344681e" "code": "411ee2f916e648d691e937ae9344681e"
} }
@ -32,6 +33,7 @@ token.
Request a new access token using a refresh token. Request a new access token using a refresh token.
{ {
"client_id": "https://hassbian.local:8123/",
"grant_type": "refresh_token", "grant_type": "refresh_token",
"refresh_token": "IJKLMNOPQRST" "refresh_token": "IJKLMNOPQRST"
} }
@ -55,6 +57,67 @@ ever been granted by that refresh token. Response code will ALWAYS be 200.
"action": "revoke" "action": "revoke"
} }
# Websocket API
## Get current user
Send websocket command `auth/current_user` will return current user of the
active websocket connection.
{
"id": 10,
"type": "auth/current_user",
}
The result payload likes
{
"id": 10,
"type": "result",
"success": true,
"result": {
"id": "USER_ID",
"name": "John Doe",
"is_owner': true,
"credentials": [
{
"auth_provider_type": "homeassistant",
"auth_provider_id": null
}
],
"mfa_modules": [
{
"id": "totp",
"name": "TOTP",
"enabled": true,
}
]
}
}
## Create a long-lived access token
Send websocket command `auth/long_lived_access_token` will create
a long-lived access token for current user. Access token will not be saved in
Home Assistant. User need to record the token in secure place.
{
"id": 11,
"type": "auth/long_lived_access_token",
"client_name": "GPS Logger",
"client_icon": null,
"lifespan": 365
}
Result will be a long-lived access token:
{
"id": 11,
"type": "result",
"success": true,
"result": "ABCDEFGH"
}
""" """
import logging import logging
import uuid import uuid
@ -63,7 +126,8 @@ from datetime import timedelta
from aiohttp import web from aiohttp import web
import voluptuous as vol import voluptuous as vol
from homeassistant.auth.models import User, Credentials from homeassistant.auth.models import User, Credentials, \
TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
from homeassistant.components import websocket_api from homeassistant.components import websocket_api
from homeassistant.components.http.ban import log_invalid_auth from homeassistant.components.http.ban import log_invalid_auth
from homeassistant.components.http.data_validator import RequestDataValidator from homeassistant.components.http.data_validator import RequestDataValidator
@ -83,6 +147,15 @@ SCHEMA_WS_CURRENT_USER = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend({
vol.Required('type'): WS_TYPE_CURRENT_USER, vol.Required('type'): WS_TYPE_CURRENT_USER,
}) })
WS_TYPE_LONG_LIVED_ACCESS_TOKEN = 'auth/long_lived_access_token'
SCHEMA_WS_LONG_LIVED_ACCESS_TOKEN = \
websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend({
vol.Required('type'): WS_TYPE_LONG_LIVED_ACCESS_TOKEN,
vol.Required('lifespan'): int, # days
vol.Required('client_name'): str,
vol.Optional('client_icon'): str,
})
RESULT_TYPE_CREDENTIALS = 'credentials' RESULT_TYPE_CREDENTIALS = 'credentials'
RESULT_TYPE_USER = 'user' RESULT_TYPE_USER = 'user'
@ -100,6 +173,11 @@ async def async_setup(hass, config):
WS_TYPE_CURRENT_USER, websocket_current_user, WS_TYPE_CURRENT_USER, websocket_current_user,
SCHEMA_WS_CURRENT_USER SCHEMA_WS_CURRENT_USER
) )
hass.components.websocket_api.async_register_command(
WS_TYPE_LONG_LIVED_ACCESS_TOKEN,
websocket_create_long_lived_access_token,
SCHEMA_WS_LONG_LIVED_ACCESS_TOKEN
)
await login_flow.async_setup(hass, store_result) await login_flow.async_setup(hass, store_result)
await mfa_setup_flow.async_setup(hass) await mfa_setup_flow.async_setup(hass)
@ -343,3 +421,27 @@ def websocket_current_user(
})) }))
hass.async_create_task(async_get_current_user(connection.user)) hass.async_create_task(async_get_current_user(connection.user))
@websocket_api.ws_require_user()
@callback
def websocket_create_long_lived_access_token(
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg):
"""Create or a long-lived access token."""
async def async_create_long_lived_access_token(user):
"""Create or a long-lived access token."""
refresh_token = await hass.auth.async_create_refresh_token(
user,
client_name=msg['client_name'],
client_icon=msg.get('client_icon'),
token_type=TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
access_token_expiration=timedelta(days=msg['lifespan']))
access_token = hass.auth.async_create_access_token(
refresh_token)
connection.send_message_outside(
websocket_api.result_message(msg['id'], access_token))
hass.async_create_task(
async_create_long_lived_access_token(connection.user))

View File

@ -2,6 +2,7 @@
from datetime import timedelta from datetime import timedelta
from unittest.mock import Mock, patch from unittest.mock import Mock, patch
import jwt
import pytest import pytest
import voluptuous as vol import voluptuous as vol
@ -323,7 +324,7 @@ async def test_generating_system_user(hass):
async def test_refresh_token_requires_client_for_user(hass): async def test_refresh_token_requires_client_for_user(hass):
"""Test that we can add a system user.""" """Test create refresh token for a user with client_id."""
manager = await auth.auth_manager_from_config(hass, [], []) manager = await auth.auth_manager_from_config(hass, [], [])
user = MockUser().add_to_auth_manager(manager) user = MockUser().add_to_auth_manager(manager)
assert user.system_generated is False assert user.system_generated is False
@ -334,10 +335,14 @@ async def test_refresh_token_requires_client_for_user(hass):
token = await manager.async_create_refresh_token(user, CLIENT_ID) token = await manager.async_create_refresh_token(user, CLIENT_ID)
assert token is not None assert token is not None
assert token.client_id == CLIENT_ID assert token.client_id == CLIENT_ID
assert token.token_type == auth_models.TOKEN_TYPE_NORMAL
# default access token expiration
assert token.access_token_expiration == \
auth_const.ACCESS_TOKEN_EXPIRATION
async def test_refresh_token_not_requires_client_for_system_user(hass): async def test_refresh_token_not_requires_client_for_system_user(hass):
"""Test that we can add a system user.""" """Test create refresh token for a system user w/o client_id."""
manager = await auth.auth_manager_from_config(hass, [], []) manager = await auth.auth_manager_from_config(hass, [], [])
user = await manager.async_create_system_user('Hass.io') user = await manager.async_create_system_user('Hass.io')
assert user.system_generated is True assert user.system_generated is True
@ -348,6 +353,56 @@ async def test_refresh_token_not_requires_client_for_system_user(hass):
token = await manager.async_create_refresh_token(user) token = await manager.async_create_refresh_token(user)
assert token is not None assert token is not None
assert token.client_id is None assert token.client_id is None
assert token.token_type == auth_models.TOKEN_TYPE_SYSTEM
async def test_refresh_token_with_specific_access_token_expiration(hass):
"""Test create a refresh token with specific access token expiration."""
manager = await auth.auth_manager_from_config(hass, [], [])
user = MockUser().add_to_auth_manager(manager)
token = await manager.async_create_refresh_token(
user, CLIENT_ID,
access_token_expiration=timedelta(days=100))
assert token is not None
assert token.client_id == CLIENT_ID
assert token.access_token_expiration == timedelta(days=100)
async def test_refresh_token_type(hass):
"""Test create a refresh token with token type."""
manager = await auth.auth_manager_from_config(hass, [], [])
user = MockUser().add_to_auth_manager(manager)
with pytest.raises(ValueError):
await manager.async_create_refresh_token(
user, CLIENT_ID, token_type=auth_models.TOKEN_TYPE_SYSTEM)
token = await manager.async_create_refresh_token(
user, CLIENT_ID,
token_type=auth_models.TOKEN_TYPE_NORMAL)
assert token is not None
assert token.client_id == CLIENT_ID
assert token.token_type == auth_models.TOKEN_TYPE_NORMAL
async def test_refresh_token_type_long_lived_access_token(hass):
"""Test create a refresh token has long-lived access token type."""
manager = await auth.auth_manager_from_config(hass, [], [])
user = MockUser().add_to_auth_manager(manager)
with pytest.raises(ValueError):
await manager.async_create_refresh_token(
user, token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN)
token = await manager.async_create_refresh_token(
user, client_name='GPS LOGGER', client_icon='mdi:home',
token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN)
assert token is not None
assert token.client_id is None
assert token.client_name == 'GPS LOGGER'
assert token.client_icon == 'mdi:home'
assert token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
async def test_cannot_deactive_owner(mock_hass): async def test_cannot_deactive_owner(mock_hass):
@ -378,6 +433,88 @@ async def test_remove_refresh_token(mock_hass):
) )
async def test_create_access_token(mock_hass):
"""Test normal refresh_token's jwt_key keep same after used."""
manager = await auth.auth_manager_from_config(mock_hass, [], [])
user = MockUser().add_to_auth_manager(manager)
refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
assert refresh_token.token_type == auth_models.TOKEN_TYPE_NORMAL
jwt_key = refresh_token.jwt_key
access_token = manager.async_create_access_token(refresh_token)
assert access_token is not None
assert refresh_token.jwt_key == jwt_key
jwt_payload = jwt.decode(access_token, jwt_key, algorithm=['HS256'])
assert jwt_payload['iss'] == refresh_token.id
assert jwt_payload['exp'] - jwt_payload['iat'] == \
timedelta(minutes=30).total_seconds()
async def test_create_long_lived_access_token(mock_hass):
"""Test refresh_token's jwt_key changed for long-lived access token."""
manager = await auth.auth_manager_from_config(mock_hass, [], [])
user = MockUser().add_to_auth_manager(manager)
refresh_token = await manager.async_create_refresh_token(
user, client_name='GPS Logger',
token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
access_token_expiration=timedelta(days=300))
assert refresh_token.token_type == \
auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
access_token = manager.async_create_access_token(refresh_token)
jwt_payload = jwt.decode(
access_token, refresh_token.jwt_key, algorithm=['HS256'])
assert jwt_payload['iss'] == refresh_token.id
assert jwt_payload['exp'] - jwt_payload['iat'] == \
timedelta(days=300).total_seconds()
async def test_one_long_lived_access_token_per_refresh_token(mock_hass):
"""Test one refresh_token can only have one long-lived access token."""
manager = await auth.auth_manager_from_config(mock_hass, [], [])
user = MockUser().add_to_auth_manager(manager)
refresh_token = await manager.async_create_refresh_token(
user, client_name='GPS Logger',
token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
access_token_expiration=timedelta(days=3000))
assert refresh_token.token_type == \
auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
access_token = manager.async_create_access_token(refresh_token)
jwt_key = refresh_token.jwt_key
rt = await manager.async_validate_access_token(access_token)
assert rt.id == refresh_token.id
with pytest.raises(ValueError):
await manager.async_create_refresh_token(
user, client_name='GPS Logger',
token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
access_token_expiration=timedelta(days=3000))
await manager.async_remove_refresh_token(refresh_token)
assert refresh_token.id not in user.refresh_tokens
rt = await manager.async_validate_access_token(access_token)
assert rt is None, 'Previous issued access token has been invoked'
refresh_token_2 = await manager.async_create_refresh_token(
user, client_name='GPS Logger',
token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
access_token_expiration=timedelta(days=3000))
assert refresh_token_2.id != refresh_token.id
assert refresh_token_2.token_type == \
auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
access_token_2 = manager.async_create_access_token(refresh_token_2)
jwt_key_2 = refresh_token_2.jwt_key
assert access_token != access_token_2
assert jwt_key != jwt_key_2
rt = await manager.async_validate_access_token(access_token_2)
jwt_payload = jwt.decode(
access_token_2, rt.jwt_key, algorithm=['HS256'])
assert jwt_payload['iss'] == refresh_token_2.id
assert jwt_payload['exp'] - jwt_payload['iat'] == \
timedelta(days=3000).total_seconds()
async def test_login_with_auth_module(mock_hass): async def test_login_with_auth_module(mock_hass):
"""Test login as existing user with auth module.""" """Test login as existing user with auth module."""
manager = await auth.auth_manager_from_config(mock_hass, [{ manager = await auth.auth_manager_from_config(mock_hass, [{

View File

@ -2,6 +2,8 @@
from datetime import timedelta from datetime import timedelta
from unittest.mock import patch from unittest.mock import patch
from homeassistant import const
from homeassistant.auth import auth_manager_from_config
from homeassistant.auth.models import Credentials from homeassistant.auth.models import Credentials
from homeassistant.components.auth import RESULT_TYPE_USER from homeassistant.components.auth import RESULT_TYPE_USER
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
@ -10,7 +12,8 @@ from homeassistant.components import auth
from . import async_setup_auth from . import async_setup_auth
from tests.common import CLIENT_ID, CLIENT_REDIRECT_URI, MockUser from tests.common import CLIENT_ID, CLIENT_REDIRECT_URI, MockUser, \
ensure_auth_manager_loaded
async def test_login_new_user_and_trying_refresh_token(hass, aiohttp_client): async def test_login_new_user_and_trying_refresh_token(hass, aiohttp_client):
@ -267,3 +270,57 @@ async def test_revoking_refresh_token(hass, aiohttp_client):
}) })
assert resp.status == 400 assert resp.status == 400
async def test_ws_long_lived_access_token(hass, hass_ws_client):
"""Test generate long-lived access token."""
hass.auth = await auth_manager_from_config(
hass, provider_configs=[{
'type': 'insecure_example',
'users': [{
'username': 'test-user',
'password': 'test-pass',
'name': 'Test Name',
}]
}], module_configs=[])
ensure_auth_manager_loaded(hass.auth)
assert await async_setup_component(hass, 'auth', {'http': {}})
assert await async_setup_component(hass, 'api', {'http': {}})
user = MockUser(id='mock-user').add_to_hass(hass)
cred = await hass.auth.auth_providers[0].async_get_or_create_credentials(
{'username': 'test-user'})
await hass.auth.async_link_user(user, cred)
ws_client = await hass_ws_client(hass, hass.auth.async_create_access_token(
await hass.auth.async_create_refresh_token(user, CLIENT_ID)))
# verify create long-lived access token
await ws_client.send_json({
'id': 5,
'type': auth.WS_TYPE_LONG_LIVED_ACCESS_TOKEN,
'client_name': 'GPS Logger',
'lifespan': 365,
})
result = await ws_client.receive_json()
assert result['success'], result
long_lived_access_token = result['result']
assert long_lived_access_token is not None
refresh_token = await hass.auth.async_validate_access_token(
long_lived_access_token)
assert refresh_token.client_id is None
assert refresh_token.client_name == 'GPS Logger'
assert refresh_token.client_icon is None
# verify long-lived access token can be used as bearer token
api_client = ws_client.client
resp = await api_client.get(const.URL_API)
assert resp.status == 401
resp = await api_client.get(const.URL_API, headers={
'Authorization': 'Bearer {}'.format(long_lived_access_token)
})
assert resp.status == 200

View File

@ -34,6 +34,8 @@ def hass_ws_client(aiohttp_client):
auth_ok = await websocket.receive_json() auth_ok = await websocket.receive_json()
assert auth_ok['type'] == wapi.TYPE_AUTH_OK assert auth_ok['type'] == wapi.TYPE_AUTH_OK
# wrap in client
websocket.client = client
return websocket return websocket
return create_client return create_client