Replace pbkdf2 with bcrypt (#16071)

* Replace pbkdf2 with bcrypt

bcrypt isn't inherently better than pbkdf2, but everything "just works"
out of the box.

  * the hash verification routine now only computes one hash per call
  * a per-user salt is built into the hash as opposed to the current
  global salt
  * bcrypt.checkpw() is immune to timing attacks regardless of input
  * hash strength is a function of real time benchmarks and a
  "difficulty" level, meaning we won't have to ever update the iteration
  count

* WIP: add hash upgrade mechanism

* WIP: clarify decode issue

* remove stale testing code

* Fix test

* Ensure incorrect legacy passwords fail

* Add better invalid legacy password test

* Lint

* Run tests in async scope
This commit is contained in:
Matt Hamilton 2018-08-26 16:50:31 -04:00 committed by Paulus Schoutsen
parent 47755fb1e9
commit bacecb4249
5 changed files with 135 additions and 8 deletions

View File

@ -5,13 +5,16 @@ import hashlib
import hmac import hmac
from typing import Any, Dict, List, Optional, cast from typing import Any, Dict, List, Optional, cast
import bcrypt
import voluptuous as vol import voluptuous as vol
from homeassistant.const import CONF_ID from homeassistant.const import CONF_ID
from homeassistant.core import callback, HomeAssistant from homeassistant.core import callback, HomeAssistant
from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import HomeAssistantError
from homeassistant.util.async_ import run_coroutine_threadsafe
from . import AuthProvider, AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, LoginFlow from . import AuthProvider, AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, LoginFlow
from ..models import Credentials, UserMeta from ..models import Credentials, UserMeta
from ..util import generate_secret from ..util import generate_secret
@ -74,8 +77,7 @@ class Data:
Raises InvalidAuth if auth invalid. Raises InvalidAuth if auth invalid.
""" """
hashed = self.hash_password(password) dummy = b'$2b$12$CiuFGszHx9eNHxPuQcwBWez4CwDTOcLTX5CbOpV6gef2nYuXkY7BO'
found = None found = None
# Compare all users to avoid timing attacks. # Compare all users to avoid timing attacks.
@ -84,22 +86,55 @@ class Data:
found = user found = user
if found is None: if found is None:
# Do one more compare to make timing the same as if user was found. # check a hash to make timing the same as if user was found
hmac.compare_digest(hashed, hashed) bcrypt.checkpw(b'foo',
dummy)
raise InvalidAuth raise InvalidAuth
if not hmac.compare_digest(hashed, user_hash = base64.b64decode(found['password'])
base64.b64decode(found['password'])):
# if the hash is not a bcrypt hash...
# provide a transparant upgrade for old pbkdf2 hash format
if not (user_hash.startswith(b'$2a$')
or user_hash.startswith(b'$2b$')
or user_hash.startswith(b'$2x$')
or user_hash.startswith(b'$2y$')):
# IMPORTANT! validate the login, bail if invalid
hashed = self.legacy_hash_password(password)
if not hmac.compare_digest(hashed, user_hash):
raise InvalidAuth
# then re-hash the valid password with bcrypt
self.change_password(found['username'], password)
run_coroutine_threadsafe(
self.async_save(), self.hass.loop
).result()
user_hash = base64.b64decode(found['password'])
# bcrypt.checkpw is timing-safe
if not bcrypt.checkpw(password.encode(),
user_hash):
raise InvalidAuth raise InvalidAuth
def hash_password(self, password: str, for_storage: bool = False) -> bytes: def legacy_hash_password(self, password: str,
"""Encode a password.""" for_storage: bool = False) -> bytes:
"""LEGACY password encoding."""
# We're no longer storing salts in data, but if one exists we
# should be able to retrieve it.
salt = self._data['salt'].encode() # type: ignore salt = self._data['salt'].encode() # type: ignore
hashed = hashlib.pbkdf2_hmac('sha512', password.encode(), salt, 100000) hashed = hashlib.pbkdf2_hmac('sha512', password.encode(), salt, 100000)
if for_storage: if for_storage:
hashed = base64.b64encode(hashed) hashed = base64.b64encode(hashed)
return hashed return hashed
# pylint: disable=no-self-use
def hash_password(self, password: str, for_storage: bool = False) -> bytes:
"""Encode a password."""
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt(rounds=12)) \
# type: bytes
if for_storage:
hashed = base64.b64encode(hashed)
return hashed
def add_auth(self, username: str, password: str) -> None: def add_auth(self, username: str, password: str) -> None:
"""Add a new authenticated user/pass.""" """Add a new authenticated user/pass."""
if any(user['username'] == username for user in self.users): if any(user['username'] == username for user in self.users):

View File

@ -2,6 +2,7 @@ aiohttp==3.4.0
astral==1.6.1 astral==1.6.1
async_timeout==3.0.0 async_timeout==3.0.0
attrs==18.1.0 attrs==18.1.0
bcrypt==3.1.4
certifi>=2018.04.16 certifi>=2018.04.16
jinja2>=2.10 jinja2>=2.10
PyJWT==1.6.4 PyJWT==1.6.4

View File

@ -3,6 +3,7 @@ aiohttp==3.4.0
astral==1.6.1 astral==1.6.1
async_timeout==3.0.0 async_timeout==3.0.0
attrs==18.1.0 attrs==18.1.0
bcrypt==3.1.4
certifi>=2018.04.16 certifi>=2018.04.16
jinja2>=2.10 jinja2>=2.10
PyJWT==1.6.4 PyJWT==1.6.4

View File

@ -36,6 +36,7 @@ REQUIRES = [
'astral==1.6.1', 'astral==1.6.1',
'async_timeout==3.0.0', 'async_timeout==3.0.0',
'attrs==18.1.0', 'attrs==18.1.0',
'bcrypt==3.1.4',
'certifi>=2018.04.16', 'certifi>=2018.04.16',
'jinja2>=2.10', 'jinja2>=2.10',
'PyJWT==1.6.4', 'PyJWT==1.6.4',

View File

@ -1,6 +1,7 @@
"""Test the Home Assistant local auth provider.""" """Test the Home Assistant local auth provider."""
from unittest.mock import Mock from unittest.mock import Mock
import base64
import pytest import pytest
from homeassistant import data_entry_flow from homeassistant import data_entry_flow
@ -132,3 +133,91 @@ async def test_new_users_populate_values(hass, data):
user = await manager.async_get_or_create_user(credentials) user = await manager.async_get_or_create_user(credentials)
assert user.name == 'hello' assert user.name == 'hello'
assert user.is_active assert user.is_active
async def test_new_hashes_are_bcrypt(data, hass):
"""Test that newly created hashes are using bcrypt."""
data.add_auth('newuser', 'newpass')
found = None
for user in data.users:
if user['username'] == 'newuser':
found = user
assert found is not None
user_hash = base64.b64decode(found['password'])
assert (user_hash.startswith(b'$2a$')
or user_hash.startswith(b'$2b$')
or user_hash.startswith(b'$2x$')
or user_hash.startswith(b'$2y$'))
async def test_pbkdf2_to_bcrypt_hash_upgrade(hass_storage, hass):
"""Test migrating user from pbkdf2 hash to bcrypt hash."""
hass_storage[hass_auth.STORAGE_KEY] = {
'version': hass_auth.STORAGE_VERSION,
'key': hass_auth.STORAGE_KEY,
'data': {
'salt': '09c52f0b120eaa7dea5f73f9a9b985f3d493b30a08f3f2945ef613'
'0b08e6a3ea',
'users': [
{
'password': 'L5PAbehB8LAQI2Ixu+d+PDNJKmljqLnBcYWYw35onC/8D'
'BM1SpvT6A8ZFael5+deCt+s+43J08IcztnguouHSw==',
'username': 'legacyuser'
}
]
},
}
data = hass_auth.Data(hass)
await data.async_load()
# verify the correct (pbkdf2) password successfuly authenticates the user
await hass.async_add_executor_job(
data.validate_login, 'legacyuser', 'beer')
# ...and that the hashes are now bcrypt hashes
user_hash = base64.b64decode(
hass_storage[hass_auth.STORAGE_KEY]['data']['users'][0]['password'])
assert (user_hash.startswith(b'$2a$')
or user_hash.startswith(b'$2b$')
or user_hash.startswith(b'$2x$')
or user_hash.startswith(b'$2y$'))
async def test_pbkdf2_to_bcrypt_hash_upgrade_with_incorrect_pass(hass_storage,
hass):
"""Test migrating user from pbkdf2 hash to bcrypt hash."""
hass_storage[hass_auth.STORAGE_KEY] = {
'version': hass_auth.STORAGE_VERSION,
'key': hass_auth.STORAGE_KEY,
'data': {
'salt': '09c52f0b120eaa7dea5f73f9a9b985f3d493b30a08f3f2945ef613'
'0b08e6a3ea',
'users': [
{
'password': 'L5PAbehB8LAQI2Ixu+d+PDNJKmljqLnBcYWYw35onC/8D'
'BM1SpvT6A8ZFael5+deCt+s+43J08IcztnguouHSw==',
'username': 'legacyuser'
}
]
},
}
data = hass_auth.Data(hass)
await data.async_load()
orig_user_hash = base64.b64decode(
hass_storage[hass_auth.STORAGE_KEY]['data']['users'][0]['password'])
# Make sure invalid legacy passwords fail
with pytest.raises(hass_auth.InvalidAuth):
await hass.async_add_executor_job(
data.validate_login, 'legacyuser', 'wine')
# Make sure we don't change the password/hash when password is incorrect
with pytest.raises(hass_auth.InvalidAuth):
await hass.async_add_executor_job(
data.validate_login, 'legacyuser', 'wine')
same_user_hash = base64.b64decode(
hass_storage[hass_auth.STORAGE_KEY]['data']['users'][0]['password'])
assert orig_user_hash == same_user_hash