mirror of
https://github.com/home-assistant/core.git
synced 2025-04-23 16:57:53 +00:00
Added command_line auth provider that validates credentials by calling a command (#19985)
* Added external auth provider that calls a configurable program Closes #19975 * Raise proper InvalidAuth exception on OSError during program execution * Changed name of external auth provider to command_line * Renamed program config option to command in command_line auth provider * Made meta variable parsing in command_line auth provider optional * Added tests for command_line auth provider * Fixed indentation * Suppressed wrong pylint warning * Fixed linting * Added test for command line auth provider login flow * Log error when user fails authentication * Use %r formatter instead of explicit repr() * Mix all used names of typing module into module namespace I consider this nasty and bad coding style, but was requested by @awarecan for consistency with the remaining codebase. * Small code style change * Strip usernames with command_line auth provider
This commit is contained in:
parent
fb1da53568
commit
06f3e8137a
164
homeassistant/auth/providers/command_line.py
Normal file
164
homeassistant/auth/providers/command_line.py
Normal file
@ -0,0 +1,164 @@
|
||||
"""Auth provider that validates credentials via an external command."""
|
||||
|
||||
from typing import Any, Dict, Optional, cast
|
||||
|
||||
import asyncio.subprocess
|
||||
import collections
|
||||
import logging
|
||||
import os
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
|
||||
from . import AuthProvider, AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, LoginFlow
|
||||
from ..models import Credentials, UserMeta
|
||||
|
||||
|
||||
CONF_COMMAND = "command"
|
||||
CONF_ARGS = "args"
|
||||
CONF_META = "meta"
|
||||
|
||||
CONFIG_SCHEMA = AUTH_PROVIDER_SCHEMA.extend({
|
||||
vol.Required(CONF_COMMAND): vol.All(
|
||||
str,
|
||||
os.path.normpath,
|
||||
msg="must be an absolute path"
|
||||
),
|
||||
vol.Optional(CONF_ARGS, default=None): vol.Any(vol.DefaultTo(list), [str]),
|
||||
vol.Optional(CONF_META, default=False): bool,
|
||||
}, extra=vol.PREVENT_EXTRA)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InvalidAuthError(HomeAssistantError):
|
||||
"""Raised when authentication with given credentials fails."""
|
||||
|
||||
|
||||
@AUTH_PROVIDERS.register("command_line")
|
||||
class CommandLineAuthProvider(AuthProvider):
|
||||
"""Auth provider validating credentials by calling a command."""
|
||||
|
||||
DEFAULT_TITLE = "Command Line Authentication"
|
||||
|
||||
# which keys to accept from a program's stdout
|
||||
ALLOWED_META_KEYS = ("name",)
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
"""Extend parent's __init__.
|
||||
|
||||
Adds self._user_meta dictionary to hold the user-specific
|
||||
attributes provided by external programs.
|
||||
"""
|
||||
super().__init__(*args, **kwargs)
|
||||
self._user_meta = {} # type: Dict[str, Dict[str, Any]]
|
||||
|
||||
async def async_login_flow(self, context: Optional[dict]) -> LoginFlow:
|
||||
"""Return a flow to login."""
|
||||
return CommandLineLoginFlow(self)
|
||||
|
||||
async def async_validate_login(self, username: str, password: str) -> None:
|
||||
"""Validate a username and password."""
|
||||
env = {
|
||||
"username": username,
|
||||
"password": password,
|
||||
}
|
||||
try:
|
||||
# pylint: disable=no-member
|
||||
process = await asyncio.subprocess.create_subprocess_exec(
|
||||
self.config[CONF_COMMAND], *self.config[CONF_ARGS],
|
||||
env=env,
|
||||
stdout=asyncio.subprocess.PIPE
|
||||
if self.config[CONF_META] else None,
|
||||
)
|
||||
stdout, _ = (await process.communicate())
|
||||
except OSError as err:
|
||||
# happens when command doesn't exist or permission is denied
|
||||
_LOGGER.error("Error while authenticating %r: %s",
|
||||
username, err)
|
||||
raise InvalidAuthError
|
||||
|
||||
if process.returncode != 0:
|
||||
_LOGGER.error("User %r failed to authenticate, command exited "
|
||||
"with code %d.",
|
||||
username, process.returncode)
|
||||
raise InvalidAuthError
|
||||
|
||||
if self.config[CONF_META]:
|
||||
meta = {} # type: Dict[str, str]
|
||||
for _line in stdout.splitlines():
|
||||
try:
|
||||
line = _line.decode().lstrip()
|
||||
if line.startswith("#"):
|
||||
continue
|
||||
key, value = line.split("=", 1)
|
||||
except ValueError:
|
||||
# malformed line
|
||||
continue
|
||||
key = key.strip()
|
||||
value = value.strip()
|
||||
if key in self.ALLOWED_META_KEYS:
|
||||
meta[key] = value
|
||||
self._user_meta[username] = meta
|
||||
|
||||
async def async_get_or_create_credentials(
|
||||
self, flow_result: Dict[str, str]
|
||||
) -> Credentials:
|
||||
"""Get credentials based on the flow result."""
|
||||
username = flow_result["username"]
|
||||
for credential in await self.async_credentials():
|
||||
if credential.data["username"] == username:
|
||||
return credential
|
||||
|
||||
# Create new credentials.
|
||||
return self.async_create_credentials({
|
||||
"username": username,
|
||||
})
|
||||
|
||||
async def async_user_meta_for_credentials(
|
||||
self, credentials: Credentials
|
||||
) -> UserMeta:
|
||||
"""Return extra user metadata for credentials.
|
||||
|
||||
Currently, only name is supported.
|
||||
"""
|
||||
meta = self._user_meta.get(credentials.data["username"], {})
|
||||
return UserMeta(
|
||||
name=meta.get("name"),
|
||||
is_active=True,
|
||||
)
|
||||
|
||||
|
||||
class CommandLineLoginFlow(LoginFlow):
|
||||
"""Handler for the login flow."""
|
||||
|
||||
async def async_step_init(
|
||||
self, user_input: Optional[Dict[str, str]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""Handle the step of the form."""
|
||||
errors = {}
|
||||
|
||||
if user_input is not None:
|
||||
user_input["username"] = user_input["username"].strip()
|
||||
try:
|
||||
await cast(CommandLineAuthProvider, self._auth_provider) \
|
||||
.async_validate_login(
|
||||
user_input["username"], user_input["password"]
|
||||
)
|
||||
except InvalidAuthError:
|
||||
errors["base"] = "invalid_auth"
|
||||
|
||||
if not errors:
|
||||
user_input.pop("password")
|
||||
return await self.async_finish(user_input)
|
||||
|
||||
schema = collections.OrderedDict() # type: Dict[str, type]
|
||||
schema["username"] = str
|
||||
schema["password"] = str
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="init",
|
||||
data_schema=vol.Schema(schema),
|
||||
errors=errors,
|
||||
)
|
148
tests/auth/providers/test_command_line.py
Normal file
148
tests/auth/providers/test_command_line.py
Normal file
@ -0,0 +1,148 @@
|
||||
"""Tests for the command_line auth provider."""
|
||||
|
||||
from unittest.mock import Mock
|
||||
import os
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
from homeassistant import data_entry_flow
|
||||
from homeassistant.auth import auth_store, models as auth_models, AuthManager
|
||||
from homeassistant.auth.providers import command_line
|
||||
from homeassistant.const import CONF_TYPE
|
||||
|
||||
from tests.common import mock_coro
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def store(hass):
|
||||
"""Mock store."""
|
||||
return auth_store.AuthStore(hass)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def provider(hass, store):
|
||||
"""Mock provider."""
|
||||
return command_line.CommandLineAuthProvider(hass, store, {
|
||||
CONF_TYPE: "command_line",
|
||||
command_line.CONF_COMMAND: os.path.join(
|
||||
os.path.dirname(__file__), "test_command_line_cmd.sh"
|
||||
),
|
||||
command_line.CONF_ARGS: [],
|
||||
command_line.CONF_META: False,
|
||||
})
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def manager(hass, store, provider):
|
||||
"""Mock manager."""
|
||||
return AuthManager(hass, store, {
|
||||
(provider.type, provider.id): provider
|
||||
}, {})
|
||||
|
||||
|
||||
async def test_create_new_credential(manager, provider):
|
||||
"""Test that we create a new credential."""
|
||||
credentials = await provider.async_get_or_create_credentials({
|
||||
"username": "good-user",
|
||||
"password": "good-pass",
|
||||
})
|
||||
assert credentials.is_new is True
|
||||
|
||||
user = await manager.async_get_or_create_user(credentials)
|
||||
assert user.is_active
|
||||
|
||||
|
||||
async def test_match_existing_credentials(store, provider):
|
||||
"""See if we match existing users."""
|
||||
existing = auth_models.Credentials(
|
||||
id=uuid.uuid4(),
|
||||
auth_provider_type="command_line",
|
||||
auth_provider_id=None,
|
||||
data={
|
||||
"username": "good-user"
|
||||
},
|
||||
is_new=False,
|
||||
)
|
||||
provider.async_credentials = Mock(return_value=mock_coro([existing]))
|
||||
credentials = await provider.async_get_or_create_credentials({
|
||||
"username": "good-user",
|
||||
"password": "irrelevant",
|
||||
})
|
||||
assert credentials is existing
|
||||
|
||||
|
||||
async def test_invalid_username(provider):
|
||||
"""Test we raise if incorrect user specified."""
|
||||
with pytest.raises(command_line.InvalidAuthError):
|
||||
await provider.async_validate_login("bad-user", "good-pass")
|
||||
|
||||
|
||||
async def test_invalid_password(provider):
|
||||
"""Test we raise if incorrect password specified."""
|
||||
with pytest.raises(command_line.InvalidAuthError):
|
||||
await provider.async_validate_login("good-user", "bad-pass")
|
||||
|
||||
|
||||
async def test_good_auth(provider):
|
||||
"""Test nothing is raised with good credentials."""
|
||||
await provider.async_validate_login("good-user", "good-pass")
|
||||
|
||||
|
||||
async def test_good_auth_with_meta(manager, provider):
|
||||
"""Test metadata is added upon successful authentication."""
|
||||
provider.config[command_line.CONF_ARGS] = ["--with-meta"]
|
||||
provider.config[command_line.CONF_META] = True
|
||||
|
||||
await provider.async_validate_login("good-user", "good-pass")
|
||||
|
||||
credentials = await provider.async_get_or_create_credentials({
|
||||
"username": "good-user",
|
||||
"password": "good-pass",
|
||||
})
|
||||
assert credentials.is_new is True
|
||||
|
||||
user = await manager.async_get_or_create_user(credentials)
|
||||
assert user.name == "Bob"
|
||||
assert user.is_active
|
||||
|
||||
|
||||
async def test_utf_8_username_password(provider):
|
||||
"""Test that we create a new credential."""
|
||||
credentials = await provider.async_get_or_create_credentials({
|
||||
"username": "ßßß",
|
||||
"password": "äöü",
|
||||
})
|
||||
assert credentials.is_new is True
|
||||
|
||||
|
||||
async def test_login_flow_validates(provider):
|
||||
"""Test login flow."""
|
||||
flow = await provider.async_login_flow({})
|
||||
result = await flow.async_step_init()
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||
|
||||
result = await flow.async_step_init({
|
||||
"username": "bad-user",
|
||||
"password": "bad-pass",
|
||||
})
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
|
||||
assert result['errors']["base"] == "invalid_auth"
|
||||
|
||||
result = await flow.async_step_init({
|
||||
"username": "good-user",
|
||||
"password": "good-pass",
|
||||
})
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||
assert result["data"]["username"] == "good-user"
|
||||
|
||||
|
||||
async def test_strip_username(provider):
|
||||
"""Test authentication works with username with whitespace around."""
|
||||
flow = await provider.async_login_flow({})
|
||||
result = await flow.async_step_init({
|
||||
"username": "\t\ngood-user ",
|
||||
"password": "good-pass",
|
||||
})
|
||||
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
||||
assert result["data"]["username"] == "good-user"
|
12
tests/auth/providers/test_command_line_cmd.sh
Executable file
12
tests/auth/providers/test_command_line_cmd.sh
Executable file
@ -0,0 +1,12 @@
|
||||
#!/bin/sh
|
||||
|
||||
if [ "$username" = "good-user" ] && [ "$password" = "good-pass" ]; then
|
||||
echo "Auth should succeed." >&2
|
||||
if [ "$1" = "--with-meta" ]; then
|
||||
echo "name=Bob"
|
||||
fi
|
||||
exit 0
|
||||
fi
|
||||
|
||||
echo "Auth should fail." >&2
|
||||
exit 1
|
Loading…
x
Reference in New Issue
Block a user