mirror of
https://github.com/home-assistant/core.git
synced 2025-04-23 16:57:53 +00:00
Add change username endpoint (#109057)
This commit is contained in:
parent
1e3ee8419f
commit
641507a45a
@ -374,6 +374,13 @@ class AuthManager:
|
||||
|
||||
self.hass.bus.async_fire(EVENT_USER_UPDATED, {"user_id": user.id})
|
||||
|
||||
@callback
|
||||
def async_update_user_credentials_data(
|
||||
self, credentials: models.Credentials, data: dict[str, Any]
|
||||
) -> None:
|
||||
"""Update credentials data."""
|
||||
self._store.async_update_user_credentials_data(credentials, data=data)
|
||||
|
||||
async def async_activate_user(self, user: models.User) -> None:
|
||||
"""Activate a user."""
|
||||
await self._store.async_activate_user(user)
|
||||
|
@ -296,6 +296,14 @@ class AuthStore:
|
||||
refresh_token.expire_at = None
|
||||
self._async_schedule_save()
|
||||
|
||||
@callback
|
||||
def async_update_user_credentials_data(
|
||||
self, credentials: models.Credentials, data: dict[str, Any]
|
||||
) -> None:
|
||||
"""Update credentials data."""
|
||||
credentials.data = data
|
||||
self._async_schedule_save()
|
||||
|
||||
async def async_load(self) -> None: # noqa: C901
|
||||
"""Load the users."""
|
||||
if self._loaded:
|
||||
|
@ -55,6 +55,27 @@ class InvalidUser(HomeAssistantError):
|
||||
"""
|
||||
|
||||
|
||||
class InvalidUsername(InvalidUser):
|
||||
"""Raised when invalid username is specified.
|
||||
|
||||
Will not be raised when validating authentication.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*args: object,
|
||||
translation_key: str | None = None,
|
||||
translation_placeholders: dict[str, str] | None = None,
|
||||
) -> None:
|
||||
"""Initialize exception."""
|
||||
super().__init__(
|
||||
*args,
|
||||
translation_domain="auth",
|
||||
translation_key=translation_key,
|
||||
translation_placeholders=translation_placeholders,
|
||||
)
|
||||
|
||||
|
||||
class Data:
|
||||
"""Hold the user data."""
|
||||
|
||||
@ -71,9 +92,11 @@ class Data:
|
||||
self.is_legacy = False
|
||||
|
||||
@callback
|
||||
def normalize_username(self, username: str) -> str:
|
||||
def normalize_username(
|
||||
self, username: str, *, force_normalize: bool = False
|
||||
) -> str:
|
||||
"""Normalize a username based on the mode."""
|
||||
if self.is_legacy:
|
||||
if self.is_legacy and not force_normalize:
|
||||
return username
|
||||
|
||||
return username.strip().casefold()
|
||||
@ -162,13 +185,11 @@ class Data:
|
||||
return hashed
|
||||
|
||||
def add_auth(self, username: str, password: str) -> None:
|
||||
"""Add a new authenticated user/pass."""
|
||||
username = self.normalize_username(username)
|
||||
"""Add a new authenticated user/pass.
|
||||
|
||||
if any(
|
||||
self.normalize_username(user["username"]) == username for user in self.users
|
||||
):
|
||||
raise InvalidUser
|
||||
Raises InvalidUsername if the new username is invalid.
|
||||
"""
|
||||
self._validate_new_username(username)
|
||||
|
||||
self.users.append(
|
||||
{
|
||||
@ -207,6 +228,45 @@ class Data:
|
||||
else:
|
||||
raise InvalidUser
|
||||
|
||||
def _validate_new_username(self, new_username: str) -> None:
|
||||
"""Validate that username is normalized and unique.
|
||||
|
||||
Raises InvalidUsername if the new username is invalid.
|
||||
"""
|
||||
normalized_username = self.normalize_username(
|
||||
new_username, force_normalize=True
|
||||
)
|
||||
if normalized_username != new_username:
|
||||
raise InvalidUsername(
|
||||
translation_key="username_not_normalized",
|
||||
translation_placeholders={"new_username": new_username},
|
||||
)
|
||||
|
||||
if any(
|
||||
self.normalize_username(user["username"]) == normalized_username
|
||||
for user in self.users
|
||||
):
|
||||
raise InvalidUsername(
|
||||
translation_key="username_already_exists",
|
||||
translation_placeholders={"username": new_username},
|
||||
)
|
||||
|
||||
def change_username(self, username: str, new_username: str) -> None:
|
||||
"""Update the username.
|
||||
|
||||
Raises InvalidUser if user cannot be found.
|
||||
Raises InvalidUsername if the new username is invalid.
|
||||
"""
|
||||
username = self.normalize_username(username)
|
||||
self._validate_new_username(new_username)
|
||||
|
||||
for user in self.users:
|
||||
if self.normalize_username(user["username"]) == username:
|
||||
user["username"] = new_username
|
||||
break
|
||||
else:
|
||||
raise InvalidUser
|
||||
|
||||
async def async_save(self) -> None:
|
||||
"""Save data."""
|
||||
if self._data is not None:
|
||||
@ -278,6 +338,22 @@ class HassAuthProvider(AuthProvider):
|
||||
)
|
||||
await self.data.async_save()
|
||||
|
||||
async def async_change_username(
|
||||
self, credential: Credentials, new_username: str
|
||||
) -> None:
|
||||
"""Validate new username and change it including updating credentials object."""
|
||||
if self.data is None:
|
||||
await self.async_initialize()
|
||||
assert self.data is not None
|
||||
|
||||
await self.hass.async_add_executor_job(
|
||||
self.data.change_username, credential.data["username"], new_username
|
||||
)
|
||||
self.hass.auth.async_update_user_credentials_data(
|
||||
credential, {**credential.data, "username": new_username}
|
||||
)
|
||||
await self.data.async_save()
|
||||
|
||||
async def async_get_or_create_credentials(
|
||||
self, flow_result: Mapping[str, str]
|
||||
) -> Credentials:
|
||||
|
@ -31,5 +31,13 @@
|
||||
"invalid_code": "Invalid code, please try again."
|
||||
}
|
||||
}
|
||||
},
|
||||
"exceptions": {
|
||||
"username_already_exists": {
|
||||
"message": "Username \"{username}\" already exists"
|
||||
},
|
||||
"username_not_normalized": {
|
||||
"message": "Username \"{new_username}\" is not normalized"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ def async_setup(hass: HomeAssistant) -> bool:
|
||||
websocket_api.async_register_command(hass, websocket_delete)
|
||||
websocket_api.async_register_command(hass, websocket_change_password)
|
||||
websocket_api.async_register_command(hass, websocket_admin_change_password)
|
||||
websocket_api.async_register_command(hass, websocket_admin_change_username)
|
||||
return True
|
||||
|
||||
|
||||
@ -194,3 +195,44 @@ async def websocket_admin_change_password(
|
||||
msg["id"], "credentials_not_found", "Credentials not found"
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required(
|
||||
"type"
|
||||
): "config/auth_provider/homeassistant/admin_change_username",
|
||||
vol.Required("user_id"): str,
|
||||
vol.Required("username"): str,
|
||||
}
|
||||
)
|
||||
@websocket_api.require_admin
|
||||
@websocket_api.async_response
|
||||
async def websocket_admin_change_username(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: dict[str, Any],
|
||||
) -> None:
|
||||
"""Change the username for any user."""
|
||||
if not connection.user.is_owner:
|
||||
raise Unauthorized(context=connection.context(msg))
|
||||
|
||||
if (user := await hass.auth.async_get_user(msg["user_id"])) is None:
|
||||
connection.send_error(msg["id"], "user_not_found", "User not found")
|
||||
return
|
||||
|
||||
provider = auth_ha.async_get_provider(hass)
|
||||
found_credential = None
|
||||
for credential in user.credentials:
|
||||
if credential.auth_provider_type == provider.type:
|
||||
found_credential = credential
|
||||
break
|
||||
|
||||
if found_credential is None:
|
||||
connection.send_error(
|
||||
msg["id"], "credentials_not_found", "Credentials not found"
|
||||
)
|
||||
return
|
||||
|
||||
await provider.async_change_username(found_credential, msg["username"])
|
||||
connection.send_result(msg["id"])
|
||||
|
@ -250,6 +250,14 @@ def gen_issues_schema(config: Config, integration: Integration) -> dict[str, Any
|
||||
}
|
||||
|
||||
|
||||
_EXCEPTIONS_SCHEMA = {
|
||||
vol.Optional("exceptions"): cv.schema_with_slug_keys(
|
||||
{vol.Optional("message"): translation_value_validator},
|
||||
slug_validator=cv.slug,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def gen_strings_schema(config: Config, integration: Integration) -> vol.Schema:
|
||||
"""Generate a strings schema."""
|
||||
return vol.Schema(
|
||||
@ -355,10 +363,7 @@ def gen_strings_schema(config: Config, integration: Integration) -> vol.Schema:
|
||||
),
|
||||
slug_validator=cv.slug,
|
||||
),
|
||||
vol.Optional("exceptions"): cv.schema_with_slug_keys(
|
||||
{vol.Optional("message"): translation_value_validator},
|
||||
slug_validator=cv.slug,
|
||||
),
|
||||
**_EXCEPTIONS_SCHEMA,
|
||||
vol.Optional("services"): cv.schema_with_slug_keys(
|
||||
{
|
||||
vol.Required("name"): translation_value_validator,
|
||||
@ -397,6 +402,7 @@ def gen_auth_schema(config: Config, integration: Integration) -> vol.Schema:
|
||||
)
|
||||
},
|
||||
vol.Optional("issues"): gen_issues_schema(config, integration),
|
||||
**_EXCEPTIONS_SCHEMA,
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -13,10 +13,11 @@ from homeassistant.auth.providers import (
|
||||
homeassistant as hass_auth,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def data(hass):
|
||||
def data(hass: HomeAssistant) -> hass_auth.Data:
|
||||
"""Create a loaded data class."""
|
||||
data = hass_auth.Data(hass)
|
||||
hass.loop.run_until_complete(data.async_load())
|
||||
@ -24,7 +25,7 @@ def data(hass):
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def legacy_data(hass):
|
||||
def legacy_data(hass: HomeAssistant) -> hass_auth.Data:
|
||||
"""Create a loaded legacy data class."""
|
||||
data = hass_auth.Data(hass)
|
||||
hass.loop.run_until_complete(data.async_load())
|
||||
@ -32,7 +33,13 @@ def legacy_data(hass):
|
||||
return data
|
||||
|
||||
|
||||
async def test_validating_password_invalid_user(data, hass: HomeAssistant) -> None:
|
||||
@pytest.fixture
|
||||
async def load_auth_component(hass: HomeAssistant) -> None:
|
||||
"""Load the auth component for translations."""
|
||||
await async_setup_component(hass, "auth", {})
|
||||
|
||||
|
||||
async def test_validating_password_invalid_user(data: hass_auth.Data) -> None:
|
||||
"""Test validating an invalid user."""
|
||||
with pytest.raises(hass_auth.InvalidAuth):
|
||||
data.validate_login("non-existing", "pw")
|
||||
@ -48,7 +55,9 @@ async def test_not_allow_set_id() -> None:
|
||||
)
|
||||
|
||||
|
||||
async def test_new_users_populate_values(hass: HomeAssistant, data) -> None:
|
||||
async def test_new_users_populate_values(
|
||||
hass: HomeAssistant, data: hass_auth.Data
|
||||
) -> None:
|
||||
"""Test that we populate data for new users."""
|
||||
data.add_auth("hello", "test-pass")
|
||||
await data.async_save()
|
||||
@ -61,7 +70,7 @@ async def test_new_users_populate_values(hass: HomeAssistant, data) -> None:
|
||||
assert user.is_active
|
||||
|
||||
|
||||
async def test_changing_password_raises_invalid_user(data, hass: HomeAssistant) -> None:
|
||||
async def test_changing_password_raises_invalid_user(data: hass_auth.Data) -> None:
|
||||
"""Test that changing password raises invalid user."""
|
||||
with pytest.raises(hass_auth.InvalidUser):
|
||||
data.change_password("non-existing", "pw")
|
||||
@ -70,20 +79,34 @@ async def test_changing_password_raises_invalid_user(data, hass: HomeAssistant)
|
||||
# Modern mode
|
||||
|
||||
|
||||
async def test_adding_user(data, hass: HomeAssistant) -> None:
|
||||
async def test_adding_user(data: hass_auth.Data) -> None:
|
||||
"""Test adding a user."""
|
||||
data.add_auth("test-user", "test-pass")
|
||||
data.validate_login(" test-user ", "test-pass")
|
||||
|
||||
|
||||
async def test_adding_user_duplicate_username(data, hass: HomeAssistant) -> None:
|
||||
@pytest.mark.parametrize("username", ["test-user ", "TEST-USER"])
|
||||
@pytest.mark.usefixtures("load_auth_component")
|
||||
def test_adding_user_not_normalized(data: hass_auth.Data, username: str) -> None:
|
||||
"""Test adding a user."""
|
||||
with pytest.raises(
|
||||
hass_auth.InvalidUsername, match=f'Username "{username}" is not normalized'
|
||||
):
|
||||
data.add_auth(username, "test-pass")
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("load_auth_component")
|
||||
def test_adding_user_duplicate_username(data: hass_auth.Data) -> None:
|
||||
"""Test adding a user with duplicate username."""
|
||||
data.add_auth("test-user", "test-pass")
|
||||
with pytest.raises(hass_auth.InvalidUser):
|
||||
data.add_auth("TEST-user ", "other-pass")
|
||||
|
||||
with pytest.raises(
|
||||
hass_auth.InvalidUsername, match='Username "test-user" already exists'
|
||||
):
|
||||
data.add_auth("test-user", "other-pass")
|
||||
|
||||
|
||||
async def test_validating_password_invalid_password(data, hass: HomeAssistant) -> None:
|
||||
async def test_validating_password_invalid_password(data: hass_auth.Data) -> None:
|
||||
"""Test validating an invalid password."""
|
||||
data.add_auth("test-user", "test-pass")
|
||||
|
||||
@ -97,7 +120,7 @@ async def test_validating_password_invalid_password(data, hass: HomeAssistant) -
|
||||
data.validate_login("test-user", "Test-pass")
|
||||
|
||||
|
||||
async def test_changing_password(data, hass: HomeAssistant) -> None:
|
||||
async def test_changing_password(data: hass_auth.Data) -> None:
|
||||
"""Test adding a user."""
|
||||
data.add_auth("test-user", "test-pass")
|
||||
data.change_password("TEST-USER ", "new-pass")
|
||||
@ -108,7 +131,7 @@ async def test_changing_password(data, hass: HomeAssistant) -> None:
|
||||
data.validate_login("test-UsEr", "new-pass")
|
||||
|
||||
|
||||
async def test_login_flow_validates(data, hass: HomeAssistant) -> None:
|
||||
async def test_login_flow_validates(data: hass_auth.Data, hass: HomeAssistant) -> None:
|
||||
"""Test login flow."""
|
||||
data.add_auth("test-user", "test-pass")
|
||||
await data.async_save()
|
||||
@ -139,7 +162,7 @@ async def test_login_flow_validates(data, hass: HomeAssistant) -> None:
|
||||
assert result["data"]["username"] == "test-USER"
|
||||
|
||||
|
||||
async def test_saving_loading(data, hass: HomeAssistant) -> None:
|
||||
async def test_saving_loading(data: hass_auth.Data, hass: HomeAssistant) -> None:
|
||||
"""Test saving and loading JSON."""
|
||||
data.add_auth("test-user", "test-pass")
|
||||
data.add_auth("second-user", "second-pass")
|
||||
@ -151,7 +174,9 @@ async def test_saving_loading(data, hass: HomeAssistant) -> None:
|
||||
data.validate_login("second-user ", "second-pass")
|
||||
|
||||
|
||||
async def test_get_or_create_credentials(hass: HomeAssistant, data) -> None:
|
||||
async def test_get_or_create_credentials(
|
||||
hass: HomeAssistant, data: hass_auth.Data
|
||||
) -> None:
|
||||
"""Test that we can get or create credentials."""
|
||||
manager = await auth_manager_from_config(hass, [{"type": "homeassistant"}], [])
|
||||
provider = manager.auth_providers[0]
|
||||
@ -167,26 +192,14 @@ async def test_get_or_create_credentials(hass: HomeAssistant, data) -> None:
|
||||
# Legacy mode
|
||||
|
||||
|
||||
async def test_legacy_adding_user(legacy_data, hass: HomeAssistant) -> None:
|
||||
async def test_legacy_adding_user(legacy_data: hass_auth.Data) -> None:
|
||||
"""Test in legacy mode adding a user."""
|
||||
legacy_data.add_auth("test-user", "test-pass")
|
||||
legacy_data.validate_login("test-user", "test-pass")
|
||||
|
||||
|
||||
async def test_legacy_adding_user_duplicate_username(
|
||||
legacy_data, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Test in legacy mode adding a user with duplicate username."""
|
||||
legacy_data.add_auth("test-user", "test-pass")
|
||||
with pytest.raises(hass_auth.InvalidUser):
|
||||
legacy_data.add_auth("test-user", "other-pass")
|
||||
# Not considered duplicate
|
||||
legacy_data.add_auth("test-user ", "test-pass")
|
||||
legacy_data.add_auth("Test-user", "test-pass")
|
||||
|
||||
|
||||
async def test_legacy_validating_password_invalid_password(
|
||||
legacy_data, hass: HomeAssistant
|
||||
legacy_data: hass_auth.Data,
|
||||
) -> None:
|
||||
"""Test in legacy mode validating an invalid password."""
|
||||
legacy_data.add_auth("test-user", "test-pass")
|
||||
@ -195,7 +208,7 @@ async def test_legacy_validating_password_invalid_password(
|
||||
legacy_data.validate_login("test-user", "invalid-pass")
|
||||
|
||||
|
||||
async def test_legacy_changing_password(legacy_data, hass: HomeAssistant) -> None:
|
||||
async def test_legacy_changing_password(legacy_data: hass_auth.Data) -> None:
|
||||
"""Test in legacy mode adding a user."""
|
||||
user = "test-user"
|
||||
legacy_data.add_auth(user, "test-pass")
|
||||
@ -208,14 +221,16 @@ async def test_legacy_changing_password(legacy_data, hass: HomeAssistant) -> Non
|
||||
|
||||
|
||||
async def test_legacy_changing_password_raises_invalid_user(
|
||||
legacy_data, hass: HomeAssistant
|
||||
legacy_data: hass_auth.Data,
|
||||
) -> None:
|
||||
"""Test in legacy mode that we initialize an empty config."""
|
||||
with pytest.raises(hass_auth.InvalidUser):
|
||||
legacy_data.change_password("non-existing", "pw")
|
||||
|
||||
|
||||
async def test_legacy_login_flow_validates(legacy_data, hass: HomeAssistant) -> None:
|
||||
async def test_legacy_login_flow_validates(
|
||||
legacy_data: hass_auth.Data, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Test in legacy mode login flow."""
|
||||
legacy_data.add_auth("test-user", "test-pass")
|
||||
await legacy_data.async_save()
|
||||
@ -246,7 +261,9 @@ async def test_legacy_login_flow_validates(legacy_data, hass: HomeAssistant) ->
|
||||
assert result["data"]["username"] == "test-user"
|
||||
|
||||
|
||||
async def test_legacy_saving_loading(legacy_data, hass: HomeAssistant) -> None:
|
||||
async def test_legacy_saving_loading(
|
||||
legacy_data: hass_auth.Data, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Test in legacy mode saving and loading JSON."""
|
||||
legacy_data.add_auth("test-user", "test-pass")
|
||||
legacy_data.add_auth("second-user", "second-pass")
|
||||
@ -263,7 +280,7 @@ async def test_legacy_saving_loading(legacy_data, hass: HomeAssistant) -> None:
|
||||
|
||||
|
||||
async def test_legacy_get_or_create_credentials(
|
||||
hass: HomeAssistant, legacy_data
|
||||
hass: HomeAssistant, legacy_data: hass_auth.Data
|
||||
) -> None:
|
||||
"""Test in legacy mode that we can get or create credentials."""
|
||||
manager = await auth_manager_from_config(hass, [{"type": "homeassistant"}], [])
|
||||
@ -308,3 +325,67 @@ async def test_race_condition_in_data_loading(hass: HomeAssistant) -> None:
|
||||
assert isinstance(results[0], hass_auth.InvalidAuth)
|
||||
# results[1] will be a TypeError if race condition occurred
|
||||
assert isinstance(results[1], hass_auth.InvalidAuth)
|
||||
|
||||
|
||||
def test_change_username(data: hass_auth.Data) -> None:
|
||||
"""Test changing username."""
|
||||
data.add_auth("test-user", "test-pass")
|
||||
users = data.users
|
||||
assert len(users) == 1
|
||||
assert users[0]["username"] == "test-user"
|
||||
|
||||
data.change_username("test-user", "new-user")
|
||||
|
||||
users = data.users
|
||||
assert len(users) == 1
|
||||
assert users[0]["username"] == "new-user"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("username", ["test-user ", "TEST-USER"])
|
||||
def test_change_username_legacy(legacy_data: hass_auth.Data, username: str) -> None:
|
||||
"""Test changing username."""
|
||||
# Cannot use add_auth as it normalizes username
|
||||
legacy_data.users.append(
|
||||
{
|
||||
"username": username,
|
||||
"password": legacy_data.hash_password("test-pass", True).decode(),
|
||||
}
|
||||
)
|
||||
|
||||
users = legacy_data.users
|
||||
assert len(users) == 1
|
||||
assert users[0]["username"] == username
|
||||
|
||||
legacy_data.change_username(username, "test-user")
|
||||
|
||||
users = legacy_data.users
|
||||
assert len(users) == 1
|
||||
assert users[0]["username"] == "test-user"
|
||||
|
||||
|
||||
def test_change_username_invalid_user(data: hass_auth.Data) -> None:
|
||||
"""Test changing username raises on invalid user."""
|
||||
data.add_auth("test-user", "test-pass")
|
||||
users = data.users
|
||||
assert len(users) == 1
|
||||
assert users[0]["username"] == "test-user"
|
||||
|
||||
with pytest.raises(hass_auth.InvalidUser):
|
||||
data.change_username("non-existing", "new-user")
|
||||
|
||||
users = data.users
|
||||
assert len(users) == 1
|
||||
assert users[0]["username"] == "test-user"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("load_auth_component")
|
||||
async def test_change_username_not_normalized(
|
||||
data: hass_auth.Data, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Test changing username raises on not normalized username."""
|
||||
data.add_auth("test-user", "test-pass")
|
||||
|
||||
with pytest.raises(
|
||||
hass_auth.InvalidUsername, match='Username "TEST-user " is not normalized'
|
||||
):
|
||||
data.change_username("test-user", "TEST-user ")
|
||||
|
@ -445,3 +445,170 @@ async def test_admin_change_password(
|
||||
assert result["success"], result
|
||||
|
||||
await auth_provider.async_validate_login("test-user", "new-pass")
|
||||
|
||||
|
||||
def _assert_username(
|
||||
local_auth: prov_ha.HassAuthProvider, username: str, *, should_exist: bool
|
||||
) -> None:
|
||||
if any(user["username"] == username for user in local_auth.data.users):
|
||||
if should_exist:
|
||||
return # found
|
||||
|
||||
pytest.fail(f"Found user with username {username} when not expected")
|
||||
|
||||
if should_exist:
|
||||
pytest.fail(f"Did not find user with username {username}")
|
||||
|
||||
|
||||
async def _test_admin_change_username(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
local_auth: prov_ha.HassAuthProvider,
|
||||
hass_admin_user: MockUser,
|
||||
owner_access_token: str,
|
||||
new_username: str,
|
||||
) -> dict[str, Any]:
|
||||
"""Test admin change username ws endpoint."""
|
||||
client = await hass_ws_client(hass, owner_access_token)
|
||||
current_username_user = hass_admin_user.credentials[0].data["username"]
|
||||
_assert_username(local_auth, current_username_user, should_exist=True)
|
||||
|
||||
await client.send_json_auto_id(
|
||||
{
|
||||
"type": "config/auth_provider/homeassistant/admin_change_username",
|
||||
"user_id": hass_admin_user.id,
|
||||
"username": new_username,
|
||||
}
|
||||
)
|
||||
return await client.receive_json()
|
||||
|
||||
|
||||
async def test_admin_change_username_success(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
local_auth: prov_ha.HassAuthProvider,
|
||||
hass_admin_user: MockUser,
|
||||
owner_access_token: str,
|
||||
) -> None:
|
||||
"""Test that change username succeeds."""
|
||||
current_username = hass_admin_user.credentials[0].data["username"]
|
||||
new_username = "blabla"
|
||||
|
||||
result = await _test_admin_change_username(
|
||||
hass,
|
||||
hass_ws_client,
|
||||
local_auth,
|
||||
hass_admin_user,
|
||||
owner_access_token,
|
||||
new_username,
|
||||
)
|
||||
|
||||
assert result["success"], result
|
||||
_assert_username(local_auth, current_username, should_exist=False)
|
||||
_assert_username(local_auth, new_username, should_exist=True)
|
||||
assert hass_admin_user.credentials[0].data["username"] == new_username
|
||||
# Validate new login works
|
||||
await local_auth.async_validate_login(new_username, "test-pass")
|
||||
with pytest.raises(prov_ha.InvalidAuth):
|
||||
# Verify old login does not work
|
||||
await local_auth.async_validate_login(current_username, "test-pass")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("new_username", [" bla", "bla ", "BlA"])
|
||||
async def test_admin_change_username_error_not_normalized(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
local_auth: prov_ha.HassAuthProvider,
|
||||
hass_admin_user: MockUser,
|
||||
owner_access_token: str,
|
||||
new_username: str,
|
||||
) -> None:
|
||||
"""Test that change username raises error."""
|
||||
current_username = hass_admin_user.credentials[0].data["username"]
|
||||
|
||||
result = await _test_admin_change_username(
|
||||
hass,
|
||||
hass_ws_client,
|
||||
local_auth,
|
||||
hass_admin_user,
|
||||
owner_access_token,
|
||||
new_username,
|
||||
)
|
||||
assert not result["success"], result
|
||||
assert result["error"] == {
|
||||
"code": "home_assistant_error",
|
||||
"message": "username_not_normalized",
|
||||
"translation_key": "username_not_normalized",
|
||||
"translation_placeholders": {"new_username": new_username},
|
||||
"translation_domain": "auth",
|
||||
}
|
||||
_assert_username(local_auth, current_username, should_exist=True)
|
||||
_assert_username(local_auth, new_username, should_exist=False)
|
||||
assert hass_admin_user.credentials[0].data["username"] == current_username
|
||||
# Validate old login still works
|
||||
await local_auth.async_validate_login(current_username, "test-pass")
|
||||
|
||||
|
||||
async def test_admin_change_username_not_owner(
|
||||
hass: HomeAssistant, hass_ws_client: WebSocketGenerator, auth_provider
|
||||
) -> None:
|
||||
"""Test that change username fails when not owner."""
|
||||
client = await hass_ws_client(hass)
|
||||
|
||||
await client.send_json_auto_id(
|
||||
{
|
||||
"type": "config/auth_provider/homeassistant/admin_change_username",
|
||||
"user_id": "test-user",
|
||||
"username": "new-user",
|
||||
}
|
||||
)
|
||||
|
||||
result = await client.receive_json()
|
||||
assert not result["success"], result
|
||||
assert result["error"]["code"] == "unauthorized"
|
||||
|
||||
# Validate old login still works
|
||||
await auth_provider.async_validate_login("test-user", "test-pass")
|
||||
|
||||
|
||||
async def test_admin_change_username_no_user(
|
||||
hass: HomeAssistant, hass_ws_client: WebSocketGenerator, owner_access_token
|
||||
) -> None:
|
||||
"""Test that change username fails with unknown user."""
|
||||
client = await hass_ws_client(hass, owner_access_token)
|
||||
|
||||
await client.send_json_auto_id(
|
||||
{
|
||||
"type": "config/auth_provider/homeassistant/admin_change_username",
|
||||
"user_id": "non-existing",
|
||||
"username": "new-username",
|
||||
}
|
||||
)
|
||||
|
||||
result = await client.receive_json()
|
||||
assert not result["success"], result
|
||||
assert result["error"]["code"] == "user_not_found"
|
||||
|
||||
|
||||
async def test_admin_change_username_no_cred(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
owner_access_token,
|
||||
hass_admin_user: MockUser,
|
||||
) -> None:
|
||||
"""Test that change username fails with unknown credential."""
|
||||
|
||||
hass_admin_user.credentials.clear()
|
||||
client = await hass_ws_client(hass, owner_access_token)
|
||||
|
||||
await client.send_json_auto_id(
|
||||
{
|
||||
"type": "config/auth_provider/homeassistant/admin_change_username",
|
||||
"user_id": hass_admin_user.id,
|
||||
"username": "new-username",
|
||||
}
|
||||
)
|
||||
|
||||
result = await client.receive_json()
|
||||
assert not result["success"], result
|
||||
assert result["error"]["code"] == "credentials_not_found"
|
||||
|
Loading…
x
Reference in New Issue
Block a user