mirror of
https://github.com/home-assistant/core.git
synced 2025-07-20 11:47:06 +00:00
Auth typing improvements (#15640)
* Always return bytes from auth.providers.homeassistant.hash_password Good for interface cleanliness, typing etc. * Add some homeassistant auth provider type annotations
This commit is contained in:
parent
397f551e6d
commit
68f03dcc67
@ -3,6 +3,7 @@ import base64
|
|||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
import hashlib
|
import hashlib
|
||||||
import hmac
|
import hmac
|
||||||
|
from typing import Dict # noqa: F401 pylint: disable=unused-import
|
||||||
|
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
@ -68,12 +69,12 @@ class Data:
|
|||||||
"""Return users."""
|
"""Return users."""
|
||||||
return self._data['users']
|
return self._data['users']
|
||||||
|
|
||||||
def validate_login(self, username, password):
|
def validate_login(self, username: str, password: str) -> None:
|
||||||
"""Validate a username and password.
|
"""Validate a username and password.
|
||||||
|
|
||||||
Raises InvalidAuth if auth invalid.
|
Raises InvalidAuth if auth invalid.
|
||||||
"""
|
"""
|
||||||
password = self.hash_password(password)
|
hashed = self.hash_password(password)
|
||||||
|
|
||||||
found = None
|
found = None
|
||||||
|
|
||||||
@ -84,33 +85,33 @@ class Data:
|
|||||||
|
|
||||||
if found is None:
|
if found is None:
|
||||||
# Do one more compare to make timing the same as if user was found.
|
# Do one more compare to make timing the same as if user was found.
|
||||||
hmac.compare_digest(password, password)
|
hmac.compare_digest(hashed, hashed)
|
||||||
raise InvalidAuth
|
raise InvalidAuth
|
||||||
|
|
||||||
if not hmac.compare_digest(password,
|
if not hmac.compare_digest(hashed,
|
||||||
base64.b64decode(found['password'])):
|
base64.b64decode(found['password'])):
|
||||||
raise InvalidAuth
|
raise InvalidAuth
|
||||||
|
|
||||||
def hash_password(self, password, for_storage=False):
|
def hash_password(self, password: str, for_storage: bool = False) -> bytes:
|
||||||
"""Encode a password."""
|
"""Encode a password."""
|
||||||
hashed = hashlib.pbkdf2_hmac(
|
hashed = hashlib.pbkdf2_hmac(
|
||||||
'sha512', password.encode(), self._data['salt'].encode(), 100000)
|
'sha512', password.encode(), self._data['salt'].encode(), 100000)
|
||||||
if for_storage:
|
if for_storage:
|
||||||
hashed = base64.b64encode(hashed).decode()
|
hashed = base64.b64encode(hashed)
|
||||||
return hashed
|
return hashed
|
||||||
|
|
||||||
def add_auth(self, username, password):
|
def add_auth(self, username: str, password: str) -> None:
|
||||||
"""Add a new authenticated user/pass."""
|
"""Add a new authenticated user/pass."""
|
||||||
if any(user['username'] == username for user in self.users):
|
if any(user['username'] == username for user in self.users):
|
||||||
raise InvalidUser
|
raise InvalidUser
|
||||||
|
|
||||||
self.users.append({
|
self.users.append({
|
||||||
'username': username,
|
'username': username,
|
||||||
'password': self.hash_password(password, True),
|
'password': self.hash_password(password, True).decode(),
|
||||||
})
|
})
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def async_remove_auth(self, username):
|
def async_remove_auth(self, username: str) -> None:
|
||||||
"""Remove authentication."""
|
"""Remove authentication."""
|
||||||
index = None
|
index = None
|
||||||
for i, user in enumerate(self.users):
|
for i, user in enumerate(self.users):
|
||||||
@ -123,14 +124,15 @@ class Data:
|
|||||||
|
|
||||||
self.users.pop(index)
|
self.users.pop(index)
|
||||||
|
|
||||||
def change_password(self, username, new_password):
|
def change_password(self, username: str, new_password: str) -> None:
|
||||||
"""Update the password.
|
"""Update the password.
|
||||||
|
|
||||||
Raises InvalidUser if user cannot be found.
|
Raises InvalidUser if user cannot be found.
|
||||||
"""
|
"""
|
||||||
for user in self.users:
|
for user in self.users:
|
||||||
if user['username'] == username:
|
if user['username'] == username:
|
||||||
user['password'] = self.hash_password(new_password, True)
|
user['password'] = self.hash_password(
|
||||||
|
new_password, True).decode()
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
raise InvalidUser
|
raise InvalidUser
|
||||||
@ -160,7 +162,7 @@ class HassAuthProvider(AuthProvider):
|
|||||||
"""Return a flow to login."""
|
"""Return a flow to login."""
|
||||||
return LoginFlow(self)
|
return LoginFlow(self)
|
||||||
|
|
||||||
async def async_validate_login(self, username, password):
|
async def async_validate_login(self, username: str, password: str):
|
||||||
"""Helper to validate a username and password."""
|
"""Helper to validate a username and password."""
|
||||||
if self.data is None:
|
if self.data is None:
|
||||||
await self.async_initialize()
|
await self.async_initialize()
|
||||||
@ -225,7 +227,7 @@ class LoginFlow(data_entry_flow.FlowHandler):
|
|||||||
data=user_input
|
data=user_input
|
||||||
)
|
)
|
||||||
|
|
||||||
schema = OrderedDict()
|
schema = OrderedDict() # type: Dict[str, type]
|
||||||
schema['username'] = str
|
schema['username'] = str
|
||||||
schema['password'] = str
|
schema['password'] = str
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user