diff --git a/homeassistant/auth/providers/command_line.py b/homeassistant/auth/providers/command_line.py new file mode 100644 index 00000000000..9cec34c1340 --- /dev/null +++ b/homeassistant/auth/providers/command_line.py @@ -0,0 +1,164 @@ +"""Auth provider that validates credentials via an external command.""" + +from typing import Any, Dict, Optional, cast + +import asyncio.subprocess +import collections +import logging +import os + +import voluptuous as vol + +from homeassistant.exceptions import HomeAssistantError + +from . import AuthProvider, AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, LoginFlow +from ..models import Credentials, UserMeta + + +CONF_COMMAND = "command" +CONF_ARGS = "args" +CONF_META = "meta" + +CONFIG_SCHEMA = AUTH_PROVIDER_SCHEMA.extend({ + vol.Required(CONF_COMMAND): vol.All( + str, + os.path.normpath, + msg="must be an absolute path" + ), + vol.Optional(CONF_ARGS, default=None): vol.Any(vol.DefaultTo(list), [str]), + vol.Optional(CONF_META, default=False): bool, +}, extra=vol.PREVENT_EXTRA) + +_LOGGER = logging.getLogger(__name__) + + +class InvalidAuthError(HomeAssistantError): + """Raised when authentication with given credentials fails.""" + + +@AUTH_PROVIDERS.register("command_line") +class CommandLineAuthProvider(AuthProvider): + """Auth provider validating credentials by calling a command.""" + + DEFAULT_TITLE = "Command Line Authentication" + + # which keys to accept from a program's stdout + ALLOWED_META_KEYS = ("name",) + + def __init__(self, *args: Any, **kwargs: Any) -> None: + """Extend parent's __init__. + + Adds self._user_meta dictionary to hold the user-specific + attributes provided by external programs. + """ + super().__init__(*args, **kwargs) + self._user_meta = {} # type: Dict[str, Dict[str, Any]] + + async def async_login_flow(self, context: Optional[dict]) -> LoginFlow: + """Return a flow to login.""" + return CommandLineLoginFlow(self) + + async def async_validate_login(self, username: str, password: str) -> None: + """Validate a username and password.""" + env = { + "username": username, + "password": password, + } + try: + # pylint: disable=no-member + process = await asyncio.subprocess.create_subprocess_exec( + self.config[CONF_COMMAND], *self.config[CONF_ARGS], + env=env, + stdout=asyncio.subprocess.PIPE + if self.config[CONF_META] else None, + ) + stdout, _ = (await process.communicate()) + except OSError as err: + # happens when command doesn't exist or permission is denied + _LOGGER.error("Error while authenticating %r: %s", + username, err) + raise InvalidAuthError + + if process.returncode != 0: + _LOGGER.error("User %r failed to authenticate, command exited " + "with code %d.", + username, process.returncode) + raise InvalidAuthError + + if self.config[CONF_META]: + meta = {} # type: Dict[str, str] + for _line in stdout.splitlines(): + try: + line = _line.decode().lstrip() + if line.startswith("#"): + continue + key, value = line.split("=", 1) + except ValueError: + # malformed line + continue + key = key.strip() + value = value.strip() + if key in self.ALLOWED_META_KEYS: + meta[key] = value + self._user_meta[username] = meta + + async def async_get_or_create_credentials( + self, flow_result: Dict[str, str] + ) -> Credentials: + """Get credentials based on the flow result.""" + username = flow_result["username"] + for credential in await self.async_credentials(): + if credential.data["username"] == username: + return credential + + # Create new credentials. + return self.async_create_credentials({ + "username": username, + }) + + async def async_user_meta_for_credentials( + self, credentials: Credentials + ) -> UserMeta: + """Return extra user metadata for credentials. + + Currently, only name is supported. + """ + meta = self._user_meta.get(credentials.data["username"], {}) + return UserMeta( + name=meta.get("name"), + is_active=True, + ) + + +class CommandLineLoginFlow(LoginFlow): + """Handler for the login flow.""" + + async def async_step_init( + self, user_input: Optional[Dict[str, str]] = None + ) -> Dict[str, Any]: + """Handle the step of the form.""" + errors = {} + + if user_input is not None: + user_input["username"] = user_input["username"].strip() + try: + await cast(CommandLineAuthProvider, self._auth_provider) \ + .async_validate_login( + user_input["username"], user_input["password"] + ) + except InvalidAuthError: + errors["base"] = "invalid_auth" + + if not errors: + user_input.pop("password") + return await self.async_finish(user_input) + + schema = collections.OrderedDict() # type: Dict[str, type] + schema["username"] = str + schema["password"] = str + + return self.async_show_form( + step_id="init", + data_schema=vol.Schema(schema), + errors=errors, + ) diff --git a/tests/auth/providers/test_command_line.py b/tests/auth/providers/test_command_line.py new file mode 100644 index 00000000000..f22958e7e38 --- /dev/null +++ b/tests/auth/providers/test_command_line.py @@ -0,0 +1,148 @@ +"""Tests for the command_line auth provider.""" + +from unittest.mock import Mock +import os +import uuid + +import pytest + +from homeassistant import data_entry_flow +from homeassistant.auth import auth_store, models as auth_models, AuthManager +from homeassistant.auth.providers import command_line +from homeassistant.const import CONF_TYPE + +from tests.common import mock_coro + + +@pytest.fixture +def store(hass): + """Mock store.""" + return auth_store.AuthStore(hass) + + +@pytest.fixture +def provider(hass, store): + """Mock provider.""" + return command_line.CommandLineAuthProvider(hass, store, { + CONF_TYPE: "command_line", + command_line.CONF_COMMAND: os.path.join( + os.path.dirname(__file__), "test_command_line_cmd.sh" + ), + command_line.CONF_ARGS: [], + command_line.CONF_META: False, + }) + + +@pytest.fixture +def manager(hass, store, provider): + """Mock manager.""" + return AuthManager(hass, store, { + (provider.type, provider.id): provider + }, {}) + + +async def test_create_new_credential(manager, provider): + """Test that we create a new credential.""" + credentials = await provider.async_get_or_create_credentials({ + "username": "good-user", + "password": "good-pass", + }) + assert credentials.is_new is True + + user = await manager.async_get_or_create_user(credentials) + assert user.is_active + + +async def test_match_existing_credentials(store, provider): + """See if we match existing users.""" + existing = auth_models.Credentials( + id=uuid.uuid4(), + auth_provider_type="command_line", + auth_provider_id=None, + data={ + "username": "good-user" + }, + is_new=False, + ) + provider.async_credentials = Mock(return_value=mock_coro([existing])) + credentials = await provider.async_get_or_create_credentials({ + "username": "good-user", + "password": "irrelevant", + }) + assert credentials is existing + + +async def test_invalid_username(provider): + """Test we raise if incorrect user specified.""" + with pytest.raises(command_line.InvalidAuthError): + await provider.async_validate_login("bad-user", "good-pass") + + +async def test_invalid_password(provider): + """Test we raise if incorrect password specified.""" + with pytest.raises(command_line.InvalidAuthError): + await provider.async_validate_login("good-user", "bad-pass") + + +async def test_good_auth(provider): + """Test nothing is raised with good credentials.""" + await provider.async_validate_login("good-user", "good-pass") + + +async def test_good_auth_with_meta(manager, provider): + """Test metadata is added upon successful authentication.""" + provider.config[command_line.CONF_ARGS] = ["--with-meta"] + provider.config[command_line.CONF_META] = True + + await provider.async_validate_login("good-user", "good-pass") + + credentials = await provider.async_get_or_create_credentials({ + "username": "good-user", + "password": "good-pass", + }) + assert credentials.is_new is True + + user = await manager.async_get_or_create_user(credentials) + assert user.name == "Bob" + assert user.is_active + + +async def test_utf_8_username_password(provider): + """Test that we create a new credential.""" + credentials = await provider.async_get_or_create_credentials({ + "username": "ßßß", + "password": "äöü", + }) + assert credentials.is_new is True + + +async def test_login_flow_validates(provider): + """Test login flow.""" + flow = await provider.async_login_flow({}) + result = await flow.async_step_init() + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + + result = await flow.async_step_init({ + "username": "bad-user", + "password": "bad-pass", + }) + assert result["type"] == data_entry_flow.RESULT_TYPE_FORM + assert result['errors']["base"] == "invalid_auth" + + result = await flow.async_step_init({ + "username": "good-user", + "password": "good-pass", + }) + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["data"]["username"] == "good-user" + + +async def test_strip_username(provider): + """Test authentication works with username with whitespace around.""" + flow = await provider.async_login_flow({}) + result = await flow.async_step_init({ + "username": "\t\ngood-user ", + "password": "good-pass", + }) + assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY + assert result["data"]["username"] == "good-user" diff --git a/tests/auth/providers/test_command_line_cmd.sh b/tests/auth/providers/test_command_line_cmd.sh new file mode 100755 index 00000000000..0e689e338f1 --- /dev/null +++ b/tests/auth/providers/test_command_line_cmd.sh @@ -0,0 +1,12 @@ +#!/bin/sh + +if [ "$username" = "good-user" ] && [ "$password" = "good-pass" ]; then + echo "Auth should succeed." >&2 + if [ "$1" = "--with-meta" ]; then + echo "name=Bob" + fi + exit 0 +fi + +echo "Auth should fail." >&2 +exit 1