From 61f7a39748d6e1930df58f2cfd65dd248e80d1e4 Mon Sep 17 00:00:00 2001 From: Paulus Schoutsen Date: Thu, 11 Oct 2018 19:24:25 +0200 Subject: [PATCH] Add permissions foundation (#16890) * Add permission foundation * Address comments * typing * False > True * Convert more lambdas * Use constants * Remove support for False * Fix only allow True --- homeassistant/auth/auth_store.py | 26 ++- homeassistant/auth/models.py | 24 +++ homeassistant/auth/permissions.py | 252 ++++++++++++++++++++++++++++++ tests/auth/test_init.py | 1 + tests/auth/test_models.py | 34 ++++ tests/auth/test_permissions.py | 198 +++++++++++++++++++++++ tests/common.py | 6 +- 7 files changed, 532 insertions(+), 9 deletions(-) create mode 100644 homeassistant/auth/permissions.py create mode 100644 tests/auth/test_models.py create mode 100644 tests/auth/test_permissions.py diff --git a/homeassistant/auth/auth_store.py b/homeassistant/auth/auth_store.py index 572393dc444..8c328bfe13e 100644 --- a/homeassistant/auth/auth_store.py +++ b/homeassistant/auth/auth_store.py @@ -10,6 +10,7 @@ from homeassistant.core import HomeAssistant, callback from homeassistant.util import dt as dt_util from . import models +from .permissions import DEFAULT_POLICY STORAGE_VERSION = 1 STORAGE_KEY = 'auth' @@ -245,12 +246,16 @@ class AuthStore: groups[group_dict['id']] = models.Group( name=group_dict['name'], id=group_dict['id'], + policy=group_dict.get('policy', DEFAULT_POLICY), ) migrate_group = None if not groups: - migrate_group = models.Group(name=INITIAL_GROUP_NAME) + migrate_group = models.Group( + name=INITIAL_GROUP_NAME, + policy=DEFAULT_POLICY + ) groups[migrate_group.id] = migrate_group for user_dict in data['users']: @@ -348,13 +353,17 @@ class AuthStore: for user in self._users.values() ] - groups = [ - { + groups = [] + for group in self._groups.values(): + g_dict = { 'name': group.name, 'id': group.id, - } - for group in self._groups.values() - ] + } # type: Dict[str, Any] + + if group.policy is not DEFAULT_POLICY: + g_dict['policy'] = group.policy + + groups.append(g_dict) credentials = [ { @@ -402,7 +411,10 @@ class AuthStore: self._users = OrderedDict() # type: Dict[str, models.User] # Add default group - all_access_group = models.Group(name=INITIAL_GROUP_NAME) + all_access_group = models.Group( + name=INITIAL_GROUP_NAME, + policy=DEFAULT_POLICY, + ) groups = OrderedDict() # type: Dict[str, models.Group] groups[all_access_group.id] = all_access_group diff --git a/homeassistant/auth/models.py b/homeassistant/auth/models.py index 7305e0e77b2..fc35f1398db 100644 --- a/homeassistant/auth/models.py +++ b/homeassistant/auth/models.py @@ -7,6 +7,7 @@ import attr from homeassistant.util import dt as dt_util +from . import permissions as perm_mdl from .util import generate_secret TOKEN_TYPE_NORMAL = 'normal' @@ -19,6 +20,7 @@ class Group: """A group.""" name = attr.ib(type=str) # type: Optional[str] + policy = attr.ib(type=perm_mdl.PolicyType) id = attr.ib(type=str, factory=lambda: uuid.uuid4().hex) @@ -44,6 +46,28 @@ class User: type=dict, factory=dict, cmp=False ) # type: Dict[str, RefreshToken] + _permissions = attr.ib( + type=perm_mdl.PolicyPermissions, + init=False, + cmp=False, + default=None, + ) + + @property + def permissions(self) -> perm_mdl.AbstractPermissions: + """Return permissions object for user.""" + if self.is_owner: + return perm_mdl.OwnerPermissions + + if self._permissions is not None: + return self._permissions + + self._permissions = perm_mdl.PolicyPermissions( + perm_mdl.merge_policies([ + group.policy for group in self.groups])) + + return self._permissions + @attr.s(slots=True) class RefreshToken: diff --git a/homeassistant/auth/permissions.py b/homeassistant/auth/permissions.py new file mode 100644 index 00000000000..82de61da7f9 --- /dev/null +++ b/homeassistant/auth/permissions.py @@ -0,0 +1,252 @@ +"""Permissions for Home Assistant.""" +from typing import ( # noqa: F401 + cast, Any, Callable, Dict, List, Mapping, Set, Tuple, Union) + +import voluptuous as vol + +from homeassistant.core import State + +CategoryType = Union[Mapping[str, 'CategoryType'], bool, None] +PolicyType = Mapping[str, CategoryType] + + +# Default policy if group has no policy applied. +DEFAULT_POLICY = { + "entities": True +} # type: PolicyType + +CAT_ENTITIES = 'entities' +ENTITY_DOMAINS = 'domains' +ENTITY_ENTITY_IDS = 'entity_ids' + +VALUES_SCHEMA = vol.Any(True, vol.Schema({ + str: True +})) + +ENTITY_POLICY_SCHEMA = vol.Any(True, vol.Schema({ + vol.Optional(ENTITY_DOMAINS): VALUES_SCHEMA, + vol.Optional(ENTITY_ENTITY_IDS): VALUES_SCHEMA, +})) + +POLICY_SCHEMA = vol.Schema({ + vol.Optional(CAT_ENTITIES): ENTITY_POLICY_SCHEMA +}) + + +class AbstractPermissions: + """Default permissions class.""" + + def check_entity(self, entity_id: str, *keys: str) -> bool: + """Test if we can access entity.""" + raise NotImplementedError + + def filter_states(self, states: List[State]) -> List[State]: + """Filter a list of states for what the user is allowed to see.""" + raise NotImplementedError + + +class PolicyPermissions(AbstractPermissions): + """Handle permissions.""" + + def __init__(self, policy: PolicyType) -> None: + """Initialize the permission class.""" + self._policy = policy + self._compiled = {} # type: Dict[str, Callable[..., bool]] + + def check_entity(self, entity_id: str, *keys: str) -> bool: + """Test if we can access entity.""" + func = self._policy_func(CAT_ENTITIES, _compile_entities) + return func(entity_id, keys) + + def filter_states(self, states: List[State]) -> List[State]: + """Filter a list of states for what the user is allowed to see.""" + func = self._policy_func(CAT_ENTITIES, _compile_entities) + keys = ('read',) + return [entity for entity in states if func(entity.entity_id, keys)] + + def _policy_func(self, category: str, + compile_func: Callable[[CategoryType], Callable]) \ + -> Callable[..., bool]: + """Get a policy function.""" + func = self._compiled.get(category) + + if func: + return func + + func = self._compiled[category] = compile_func( + self._policy.get(category)) + return func + + def __eq__(self, other: Any) -> bool: + """Equals check.""" + # pylint: disable=protected-access + return (isinstance(other, PolicyPermissions) and + other._policy == self._policy) + + +class _OwnerPermissions(AbstractPermissions): + """Owner permissions.""" + + # pylint: disable=no-self-use + + def check_entity(self, entity_id: str, *keys: str) -> bool: + """Test if we can access entity.""" + return True + + def filter_states(self, states: List[State]) -> List[State]: + """Filter a list of states for what the user is allowed to see.""" + return states + + +OwnerPermissions = _OwnerPermissions() # pylint: disable=invalid-name + + +def _compile_entities(policy: CategoryType) \ + -> Callable[[str, Tuple[str]], bool]: + """Compile policy into a function that tests policy.""" + # None, Empty Dict, False + if not policy: + def apply_policy_deny_all(entity_id: str, keys: Tuple[str]) -> bool: + """Decline all.""" + return False + + return apply_policy_deny_all + + if policy is True: + def apply_policy_allow_all(entity_id: str, keys: Tuple[str]) -> bool: + """Approve all.""" + return True + + return apply_policy_allow_all + + assert isinstance(policy, dict) + + domains = policy.get(ENTITY_DOMAINS) + entity_ids = policy.get(ENTITY_ENTITY_IDS) + + funcs = [] # type: List[Callable[[str, Tuple[str]], Union[None, bool]]] + + # The order of these functions matter. The more precise are at the top. + # If a function returns None, they cannot handle it. + # If a function returns a boolean, that's the result to return. + + # Setting entity_ids to a boolean is final decision for permissions + # So return right away. + if isinstance(entity_ids, bool): + def apply_entity_id_policy(entity_id: str, keys: Tuple[str]) -> bool: + """Test if allowed entity_id.""" + return entity_ids # type: ignore + + return apply_entity_id_policy + + if entity_ids is not None: + def allowed_entity_id(entity_id: str, keys: Tuple[str]) \ + -> Union[None, bool]: + """Test if allowed entity_id.""" + return entity_ids.get(entity_id) # type: ignore + + funcs.append(allowed_entity_id) + + if isinstance(domains, bool): + def allowed_domain(entity_id: str, keys: Tuple[str]) \ + -> Union[None, bool]: + """Test if allowed domain.""" + return domains + + funcs.append(allowed_domain) + + elif domains is not None: + def allowed_domain(entity_id: str, keys: Tuple[str]) \ + -> Union[None, bool]: + """Test if allowed domain.""" + domain = entity_id.split(".", 1)[0] + return domains.get(domain) # type: ignore + + funcs.append(allowed_domain) + + # Can happen if no valid subcategories specified + if not funcs: + def apply_policy_deny_all_2(entity_id: str, keys: Tuple[str]) -> bool: + """Decline all.""" + return False + + return apply_policy_deny_all_2 + + if len(funcs) == 1: + func = funcs[0] + + def apply_policy_func(entity_id: str, keys: Tuple[str]) -> bool: + """Apply a single policy function.""" + return func(entity_id, keys) is True + + return apply_policy_func + + def apply_policy_funcs(entity_id: str, keys: Tuple[str]) -> bool: + """Apply several policy functions.""" + for func in funcs: + result = func(entity_id, keys) + if result is not None: + return result + return False + + return apply_policy_funcs + + +def merge_policies(policies: List[PolicyType]) -> PolicyType: + """Merge policies.""" + new_policy = {} # type: Dict[str, CategoryType] + seen = set() # type: Set[str] + for policy in policies: + for category in policy: + if category in seen: + continue + seen.add(category) + new_policy[category] = _merge_policies([ + policy.get(category) for policy in policies]) + cast(PolicyType, new_policy) + return new_policy + + +def _merge_policies(sources: List[CategoryType]) -> CategoryType: + """Merge a policy.""" + # When merging policies, the most permissive wins. + # This means we order it like this: + # True > Dict > None + # + # True: allow everything + # Dict: specify more granular permissions + # None: no opinion + # + # If there are multiple sources with a dict as policy, we recursively + # merge each key in the source. + + policy = None # type: CategoryType + seen = set() # type: Set[str] + for source in sources: + if source is None: + continue + + # A source that's True will always win. Shortcut return. + if source is True: + return True + + assert isinstance(source, dict) + + if policy is None: + policy = {} + + assert isinstance(policy, dict) + + for key in source: + if key in seen: + continue + seen.add(key) + + key_sources = [] + for src in sources: + if isinstance(src, dict): + key_sources.append(src.get(key)) + + policy[key] = _merge_policies(key_sources) + + return policy diff --git a/tests/auth/test_init.py b/tests/auth/test_init.py index f6086db7516..4357ba1b1de 100644 --- a/tests/auth/test_init.py +++ b/tests/auth/test_init.py @@ -302,6 +302,7 @@ async def test_saving_loading(hass, hass_storage): store2 = auth_store.AuthStore(hass) users = await store2.async_get_users() assert len(users) == 1 + assert users[0].permissions == user.permissions assert users[0] == user assert len(users[0].refresh_tokens) == 2 for r_token in users[0].refresh_tokens.values(): diff --git a/tests/auth/test_models.py b/tests/auth/test_models.py new file mode 100644 index 00000000000..c84bdc7390b --- /dev/null +++ b/tests/auth/test_models.py @@ -0,0 +1,34 @@ +"""Tests for the auth models.""" +from homeassistant.auth import models, permissions + + +def test_owner_fetching_owner_permissions(): + """Test we fetch the owner permissions for an owner user.""" + group = models.Group(name="Test Group", policy={}) + owner = models.User(name="Test User", groups=[group], is_owner=True) + assert owner.permissions is permissions.OwnerPermissions + + +def test_permissions_merged(): + """Test we merge the groups permissions.""" + group = models.Group(name="Test Group", policy={ + 'entities': { + 'domains': { + 'switch': True + } + } + }) + group2 = models.Group(name="Test Group", policy={ + 'entities': { + 'entity_ids': { + 'light.kitchen': True + } + } + }) + user = models.User(name="Test User", groups=[group, group2]) + # Make sure we cache instance + assert user.permissions is user.permissions + + assert user.permissions.check_entity('switch.bla') is True + assert user.permissions.check_entity('light.kitchen') is True + assert user.permissions.check_entity('light.not_kitchen') is False diff --git a/tests/auth/test_permissions.py b/tests/auth/test_permissions.py new file mode 100644 index 00000000000..71582dc281d --- /dev/null +++ b/tests/auth/test_permissions.py @@ -0,0 +1,198 @@ +"""Tests for the auth permission system.""" +import pytest +import voluptuous as vol + +from homeassistant.core import State +from homeassistant.auth import permissions + + +def test_entities_none(): + """Test entity ID policy.""" + policy = None + compiled = permissions._compile_entities(policy) + assert compiled('light.kitchen', []) is False + + +def test_entities_empty(): + """Test entity ID policy.""" + policy = {} + permissions.ENTITY_POLICY_SCHEMA(policy) + compiled = permissions._compile_entities(policy) + assert compiled('light.kitchen', []) is False + + +def test_entities_false(): + """Test entity ID policy.""" + policy = False + with pytest.raises(vol.Invalid): + permissions.ENTITY_POLICY_SCHEMA(policy) + + +def test_entities_true(): + """Test entity ID policy.""" + policy = True + permissions.ENTITY_POLICY_SCHEMA(policy) + compiled = permissions._compile_entities(policy) + assert compiled('light.kitchen', []) is True + + +def test_entities_domains_true(): + """Test entity ID policy.""" + policy = { + 'domains': True + } + permissions.ENTITY_POLICY_SCHEMA(policy) + compiled = permissions._compile_entities(policy) + assert compiled('light.kitchen', []) is True + + +def test_entities_domains_domain_true(): + """Test entity ID policy.""" + policy = { + 'domains': { + 'light': True + } + } + permissions.ENTITY_POLICY_SCHEMA(policy) + compiled = permissions._compile_entities(policy) + assert compiled('light.kitchen', []) is True + assert compiled('switch.kitchen', []) is False + + +def test_entities_domains_domain_false(): + """Test entity ID policy.""" + policy = { + 'domains': { + 'light': False + } + } + with pytest.raises(vol.Invalid): + permissions.ENTITY_POLICY_SCHEMA(policy) + + +def test_entities_entity_ids_true(): + """Test entity ID policy.""" + policy = { + 'entity_ids': True + } + permissions.ENTITY_POLICY_SCHEMA(policy) + compiled = permissions._compile_entities(policy) + assert compiled('light.kitchen', []) is True + + +def test_entities_entity_ids_false(): + """Test entity ID policy.""" + policy = { + 'entity_ids': False + } + with pytest.raises(vol.Invalid): + permissions.ENTITY_POLICY_SCHEMA(policy) + + +def test_entities_entity_ids_entity_id_true(): + """Test entity ID policy.""" + policy = { + 'entity_ids': { + 'light.kitchen': True + } + } + permissions.ENTITY_POLICY_SCHEMA(policy) + compiled = permissions._compile_entities(policy) + assert compiled('light.kitchen', []) is True + assert compiled('switch.kitchen', []) is False + + +def test_entities_entity_ids_entity_id_false(): + """Test entity ID policy.""" + policy = { + 'entity_ids': { + 'light.kitchen': False + } + } + with pytest.raises(vol.Invalid): + permissions.ENTITY_POLICY_SCHEMA(policy) + + +def test_policy_perm_filter_states(): + """Test filtering entitites.""" + states = [ + State('light.kitchen', 'on'), + State('light.living_room', 'off'), + State('light.balcony', 'on'), + ] + perm = permissions.PolicyPermissions({ + 'entities': { + 'entity_ids': { + 'light.kitchen': True, + 'light.balcony': True, + } + } + }) + filtered = perm.filter_states(states) + assert len(filtered) == 2 + assert filtered == [states[0], states[2]] + + +def test_owner_permissions(): + """Test owner permissions access all.""" + assert permissions.OwnerPermissions.check_entity('light.kitchen') + states = [ + State('light.kitchen', 'on'), + State('light.living_room', 'off'), + State('light.balcony', 'on'), + ] + assert permissions.OwnerPermissions.filter_states(states) == states + + +def test_default_policy_allow_all(): + """Test that the default policy is to allow all entity actions.""" + perm = permissions.PolicyPermissions(permissions.DEFAULT_POLICY) + assert perm.check_entity('light.kitchen') + states = [ + State('light.kitchen', 'on'), + State('light.living_room', 'off'), + State('light.balcony', 'on'), + ] + assert perm.filter_states(states) == states + + +def test_merging_permissions_true_rules_dict(): + """Test merging policy with two entities.""" + policy1 = { + 'something_else': True, + 'entities': { + 'entity_ids': { + 'light.kitchen': True, + } + } + } + policy2 = { + 'entities': { + 'entity_ids': True + } + } + assert permissions.merge_policies([policy1, policy2]) == { + 'something_else': True, + 'entities': { + 'entity_ids': True + } + } + + +def test_merging_permissions_multiple_subcategories(): + """Test merging policy with two entities.""" + policy1 = { + 'entities': None + } + policy2 = { + 'entities': { + 'entity_ids': True, + } + } + policy3 = { + 'entities': True + } + assert permissions.merge_policies([policy1, policy2]) == policy2 + assert permissions.merge_policies([policy1, policy3]) == policy3 + + assert permissions.merge_policies([policy2, policy3]) == policy3 diff --git a/tests/common.py b/tests/common.py index ce80746be4e..44f934e4cb3 100644 --- a/tests/common.py +++ b/tests/common.py @@ -348,10 +348,12 @@ def mock_device_registry(hass, mock_entries=None): class MockGroup(auth_models.Group): """Mock a group in Home Assistant.""" - def __init__(self, id=None, name='Mock Group'): + def __init__(self, id=None, name='Mock Group', + policy=auth_store.DEFAULT_POLICY): """Mock a group.""" kwargs = { - 'name': name + 'name': name, + 'policy': policy, } if id is not None: kwargs['id'] = id