diff --git a/homeassistant/auth/permissions.py b/homeassistant/auth/permissions.py deleted file mode 100644 index 82de61da7f9..00000000000 --- a/homeassistant/auth/permissions.py +++ /dev/null @@ -1,252 +0,0 @@ -"""Permissions for Home Assistant.""" -from typing import ( # noqa: F401 - cast, Any, Callable, Dict, List, Mapping, Set, Tuple, Union) - -import voluptuous as vol - -from homeassistant.core import State - -CategoryType = Union[Mapping[str, 'CategoryType'], bool, None] -PolicyType = Mapping[str, CategoryType] - - -# Default policy if group has no policy applied. -DEFAULT_POLICY = { - "entities": True -} # type: PolicyType - -CAT_ENTITIES = 'entities' -ENTITY_DOMAINS = 'domains' -ENTITY_ENTITY_IDS = 'entity_ids' - -VALUES_SCHEMA = vol.Any(True, vol.Schema({ - str: True -})) - -ENTITY_POLICY_SCHEMA = vol.Any(True, vol.Schema({ - vol.Optional(ENTITY_DOMAINS): VALUES_SCHEMA, - vol.Optional(ENTITY_ENTITY_IDS): VALUES_SCHEMA, -})) - -POLICY_SCHEMA = vol.Schema({ - vol.Optional(CAT_ENTITIES): ENTITY_POLICY_SCHEMA -}) - - -class AbstractPermissions: - """Default permissions class.""" - - def check_entity(self, entity_id: str, *keys: str) -> bool: - """Test if we can access entity.""" - raise NotImplementedError - - def filter_states(self, states: List[State]) -> List[State]: - """Filter a list of states for what the user is allowed to see.""" - raise NotImplementedError - - -class PolicyPermissions(AbstractPermissions): - """Handle permissions.""" - - def __init__(self, policy: PolicyType) -> None: - """Initialize the permission class.""" - self._policy = policy - self._compiled = {} # type: Dict[str, Callable[..., bool]] - - def check_entity(self, entity_id: str, *keys: str) -> bool: - """Test if we can access entity.""" - func = self._policy_func(CAT_ENTITIES, _compile_entities) - return func(entity_id, keys) - - def filter_states(self, states: List[State]) -> List[State]: - """Filter a list of states for what the user is allowed to see.""" - func = self._policy_func(CAT_ENTITIES, _compile_entities) - keys = ('read',) - return [entity for entity in states if func(entity.entity_id, keys)] - - def _policy_func(self, category: str, - compile_func: Callable[[CategoryType], Callable]) \ - -> Callable[..., bool]: - """Get a policy function.""" - func = self._compiled.get(category) - - if func: - return func - - func = self._compiled[category] = compile_func( - self._policy.get(category)) - return func - - def __eq__(self, other: Any) -> bool: - """Equals check.""" - # pylint: disable=protected-access - return (isinstance(other, PolicyPermissions) and - other._policy == self._policy) - - -class _OwnerPermissions(AbstractPermissions): - """Owner permissions.""" - - # pylint: disable=no-self-use - - def check_entity(self, entity_id: str, *keys: str) -> bool: - """Test if we can access entity.""" - return True - - def filter_states(self, states: List[State]) -> List[State]: - """Filter a list of states for what the user is allowed to see.""" - return states - - -OwnerPermissions = _OwnerPermissions() # pylint: disable=invalid-name - - -def _compile_entities(policy: CategoryType) \ - -> Callable[[str, Tuple[str]], bool]: - """Compile policy into a function that tests policy.""" - # None, Empty Dict, False - if not policy: - def apply_policy_deny_all(entity_id: str, keys: Tuple[str]) -> bool: - """Decline all.""" - return False - - return apply_policy_deny_all - - if policy is True: - def apply_policy_allow_all(entity_id: str, keys: Tuple[str]) -> bool: - """Approve all.""" - return True - - return apply_policy_allow_all - - assert isinstance(policy, dict) - - domains = policy.get(ENTITY_DOMAINS) - entity_ids = policy.get(ENTITY_ENTITY_IDS) - - funcs = [] # type: List[Callable[[str, Tuple[str]], Union[None, bool]]] - - # The order of these functions matter. The more precise are at the top. - # If a function returns None, they cannot handle it. - # If a function returns a boolean, that's the result to return. - - # Setting entity_ids to a boolean is final decision for permissions - # So return right away. - if isinstance(entity_ids, bool): - def apply_entity_id_policy(entity_id: str, keys: Tuple[str]) -> bool: - """Test if allowed entity_id.""" - return entity_ids # type: ignore - - return apply_entity_id_policy - - if entity_ids is not None: - def allowed_entity_id(entity_id: str, keys: Tuple[str]) \ - -> Union[None, bool]: - """Test if allowed entity_id.""" - return entity_ids.get(entity_id) # type: ignore - - funcs.append(allowed_entity_id) - - if isinstance(domains, bool): - def allowed_domain(entity_id: str, keys: Tuple[str]) \ - -> Union[None, bool]: - """Test if allowed domain.""" - return domains - - funcs.append(allowed_domain) - - elif domains is not None: - def allowed_domain(entity_id: str, keys: Tuple[str]) \ - -> Union[None, bool]: - """Test if allowed domain.""" - domain = entity_id.split(".", 1)[0] - return domains.get(domain) # type: ignore - - funcs.append(allowed_domain) - - # Can happen if no valid subcategories specified - if not funcs: - def apply_policy_deny_all_2(entity_id: str, keys: Tuple[str]) -> bool: - """Decline all.""" - return False - - return apply_policy_deny_all_2 - - if len(funcs) == 1: - func = funcs[0] - - def apply_policy_func(entity_id: str, keys: Tuple[str]) -> bool: - """Apply a single policy function.""" - return func(entity_id, keys) is True - - return apply_policy_func - - def apply_policy_funcs(entity_id: str, keys: Tuple[str]) -> bool: - """Apply several policy functions.""" - for func in funcs: - result = func(entity_id, keys) - if result is not None: - return result - return False - - return apply_policy_funcs - - -def merge_policies(policies: List[PolicyType]) -> PolicyType: - """Merge policies.""" - new_policy = {} # type: Dict[str, CategoryType] - seen = set() # type: Set[str] - for policy in policies: - for category in policy: - if category in seen: - continue - seen.add(category) - new_policy[category] = _merge_policies([ - policy.get(category) for policy in policies]) - cast(PolicyType, new_policy) - return new_policy - - -def _merge_policies(sources: List[CategoryType]) -> CategoryType: - """Merge a policy.""" - # When merging policies, the most permissive wins. - # This means we order it like this: - # True > Dict > None - # - # True: allow everything - # Dict: specify more granular permissions - # None: no opinion - # - # If there are multiple sources with a dict as policy, we recursively - # merge each key in the source. - - policy = None # type: CategoryType - seen = set() # type: Set[str] - for source in sources: - if source is None: - continue - - # A source that's True will always win. Shortcut return. - if source is True: - return True - - assert isinstance(source, dict) - - if policy is None: - policy = {} - - assert isinstance(policy, dict) - - for key in source: - if key in seen: - continue - seen.add(key) - - key_sources = [] - for src in sources: - if isinstance(src, dict): - key_sources.append(src.get(key)) - - policy[key] = _merge_policies(key_sources) - - return policy diff --git a/homeassistant/auth/permissions/__init__.py b/homeassistant/auth/permissions/__init__.py new file mode 100644 index 00000000000..ee0d3af0c54 --- /dev/null +++ b/homeassistant/auth/permissions/__init__.py @@ -0,0 +1,97 @@ +"""Permissions for Home Assistant.""" +import logging +from typing import ( # noqa: F401 + cast, Any, Callable, Dict, List, Mapping, Set, Tuple, Union) + +import voluptuous as vol + +from homeassistant.core import State + +from .common import CategoryType, PolicyType +from .entities import ENTITY_POLICY_SCHEMA, compile_entities +from .merge import merge_policies # noqa + + +# Default policy if group has no policy applied. +DEFAULT_POLICY = { + "entities": True +} # type: PolicyType + +CAT_ENTITIES = 'entities' + +POLICY_SCHEMA = vol.Schema({ + vol.Optional(CAT_ENTITIES): ENTITY_POLICY_SCHEMA +}) + +_LOGGER = logging.getLogger(__name__) + + +class AbstractPermissions: + """Default permissions class.""" + + def check_entity(self, entity_id: str, key: str) -> bool: + """Test if we can access entity.""" + raise NotImplementedError + + def filter_states(self, states: List[State]) -> List[State]: + """Filter a list of states for what the user is allowed to see.""" + raise NotImplementedError + + +class PolicyPermissions(AbstractPermissions): + """Handle permissions.""" + + def __init__(self, policy: PolicyType) -> None: + """Initialize the permission class.""" + self._policy = policy + self._compiled = {} # type: Dict[str, Callable[..., bool]] + + def check_entity(self, entity_id: str, key: str) -> bool: + """Test if we can access entity.""" + func = self._policy_func(CAT_ENTITIES, compile_entities) + return func(entity_id, (key,)) + + def filter_states(self, states: List[State]) -> List[State]: + """Filter a list of states for what the user is allowed to see.""" + func = self._policy_func(CAT_ENTITIES, compile_entities) + keys = ('read',) + return [entity for entity in states if func(entity.entity_id, keys)] + + def _policy_func(self, category: str, + compile_func: Callable[[CategoryType], Callable]) \ + -> Callable[..., bool]: + """Get a policy function.""" + func = self._compiled.get(category) + + if func: + return func + + func = self._compiled[category] = compile_func( + self._policy.get(category)) + + _LOGGER.debug("Compiled %s func: %s", category, func) + + return func + + def __eq__(self, other: Any) -> bool: + """Equals check.""" + # pylint: disable=protected-access + return (isinstance(other, PolicyPermissions) and + other._policy == self._policy) + + +class _OwnerPermissions(AbstractPermissions): + """Owner permissions.""" + + # pylint: disable=no-self-use + + def check_entity(self, entity_id: str, key: str) -> bool: + """Test if we can access entity.""" + return True + + def filter_states(self, states: List[State]) -> List[State]: + """Filter a list of states for what the user is allowed to see.""" + return states + + +OwnerPermissions = _OwnerPermissions() # pylint: disable=invalid-name diff --git a/homeassistant/auth/permissions/common.py b/homeassistant/auth/permissions/common.py new file mode 100644 index 00000000000..f87f9d70ddf --- /dev/null +++ b/homeassistant/auth/permissions/common.py @@ -0,0 +1,33 @@ +"""Common code for permissions.""" +from typing import ( # noqa: F401 + Mapping, Union, Any) + +# MyPy doesn't support recursion yet. So writing it out as far as we need. + +ValueType = Union[ + # Example: entities.all = { read: true, control: true } + Mapping[str, bool], + bool, + None +] + +SubCategoryType = Union[ + # Example: entities.domains = { light: … } + Mapping[str, ValueType], + bool, + None +] + +CategoryType = Union[ + # Example: entities.domains + Mapping[str, SubCategoryType], + # Example: entities.all + Mapping[str, ValueType], + bool, + None +] + +# Example: { entities: … } +PolicyType = Mapping[str, CategoryType] + +SUBCAT_ALL = 'all' diff --git a/homeassistant/auth/permissions/entities.py b/homeassistant/auth/permissions/entities.py new file mode 100644 index 00000000000..b38600fe130 --- /dev/null +++ b/homeassistant/auth/permissions/entities.py @@ -0,0 +1,149 @@ +"""Entity permissions.""" +from functools import wraps +from typing import ( # noqa: F401 + Callable, Dict, List, Tuple, Union) + +import voluptuous as vol + +from .common import CategoryType, ValueType, SUBCAT_ALL + + +POLICY_READ = 'read' +POLICY_CONTROL = 'control' +POLICY_EDIT = 'edit' + +SINGLE_ENTITY_SCHEMA = vol.Any(True, vol.Schema({ + vol.Optional(POLICY_READ): True, + vol.Optional(POLICY_CONTROL): True, + vol.Optional(POLICY_EDIT): True, +})) + +ENTITY_DOMAINS = 'domains' +ENTITY_ENTITY_IDS = 'entity_ids' + +ENTITY_VALUES_SCHEMA = vol.Any(True, vol.Schema({ + str: SINGLE_ENTITY_SCHEMA +})) + +ENTITY_POLICY_SCHEMA = vol.Any(True, vol.Schema({ + vol.Optional(SUBCAT_ALL): SINGLE_ENTITY_SCHEMA, + vol.Optional(ENTITY_DOMAINS): ENTITY_VALUES_SCHEMA, + vol.Optional(ENTITY_ENTITY_IDS): ENTITY_VALUES_SCHEMA, +})) + + +def _entity_allowed(schema: ValueType, keys: Tuple[str]) \ + -> Union[bool, None]: + """Test if an entity is allowed based on the keys.""" + if schema is None or isinstance(schema, bool): + return schema + assert isinstance(schema, dict) + return schema.get(keys[0]) + + +def compile_entities(policy: CategoryType) \ + -> Callable[[str, Tuple[str]], bool]: + """Compile policy into a function that tests policy.""" + # None, Empty Dict, False + if not policy: + def apply_policy_deny_all(entity_id: str, keys: Tuple[str]) -> bool: + """Decline all.""" + return False + + return apply_policy_deny_all + + if policy is True: + def apply_policy_allow_all(entity_id: str, keys: Tuple[str]) -> bool: + """Approve all.""" + return True + + return apply_policy_allow_all + + assert isinstance(policy, dict) + + domains = policy.get(ENTITY_DOMAINS) + entity_ids = policy.get(ENTITY_ENTITY_IDS) + all_entities = policy.get(SUBCAT_ALL) + + funcs = [] # type: List[Callable[[str, Tuple[str]], Union[None, bool]]] + + # The order of these functions matter. The more precise are at the top. + # If a function returns None, they cannot handle it. + # If a function returns a boolean, that's the result to return. + + # Setting entity_ids to a boolean is final decision for permissions + # So return right away. + if isinstance(entity_ids, bool): + def allowed_entity_id_bool(entity_id: str, keys: Tuple[str]) -> bool: + """Test if allowed entity_id.""" + return entity_ids # type: ignore + + return allowed_entity_id_bool + + if entity_ids is not None: + def allowed_entity_id_dict(entity_id: str, keys: Tuple[str]) \ + -> Union[None, bool]: + """Test if allowed entity_id.""" + return _entity_allowed( + entity_ids.get(entity_id), keys) # type: ignore + + funcs.append(allowed_entity_id_dict) + + if isinstance(domains, bool): + def allowed_domain_bool(entity_id: str, keys: Tuple[str]) \ + -> Union[None, bool]: + """Test if allowed domain.""" + return domains + + funcs.append(allowed_domain_bool) + + elif domains is not None: + def allowed_domain_dict(entity_id: str, keys: Tuple[str]) \ + -> Union[None, bool]: + """Test if allowed domain.""" + domain = entity_id.split(".", 1)[0] + return _entity_allowed(domains.get(domain), keys) # type: ignore + + funcs.append(allowed_domain_dict) + + if isinstance(all_entities, bool): + def allowed_all_entities_bool(entity_id: str, keys: Tuple[str]) \ + -> Union[None, bool]: + """Test if allowed domain.""" + return all_entities + funcs.append(allowed_all_entities_bool) + + elif all_entities is not None: + def allowed_all_entities_dict(entity_id: str, keys: Tuple[str]) \ + -> Union[None, bool]: + """Test if allowed domain.""" + return _entity_allowed(all_entities, keys) + funcs.append(allowed_all_entities_dict) + + # Can happen if no valid subcategories specified + if not funcs: + def apply_policy_deny_all_2(entity_id: str, keys: Tuple[str]) -> bool: + """Decline all.""" + return False + + return apply_policy_deny_all_2 + + if len(funcs) == 1: + func = funcs[0] + + @wraps(func) + def apply_policy_func(entity_id: str, keys: Tuple[str]) -> bool: + """Apply a single policy function.""" + return func(entity_id, keys) is True + + return apply_policy_func + + def apply_policy_funcs(entity_id: str, keys: Tuple[str]) -> bool: + """Apply several policy functions.""" + for func in funcs: + result = func(entity_id, keys) + if result is not None: + return result + return False + + return apply_policy_funcs diff --git a/homeassistant/auth/permissions/merge.py b/homeassistant/auth/permissions/merge.py new file mode 100644 index 00000000000..32cbfefcf1c --- /dev/null +++ b/homeassistant/auth/permissions/merge.py @@ -0,0 +1,65 @@ +"""Merging of policies.""" +from typing import ( # noqa: F401 + cast, Dict, List, Set) + +from .common import PolicyType, CategoryType + + +def merge_policies(policies: List[PolicyType]) -> PolicyType: + """Merge policies.""" + new_policy = {} # type: Dict[str, CategoryType] + seen = set() # type: Set[str] + for policy in policies: + for category in policy: + if category in seen: + continue + seen.add(category) + new_policy[category] = _merge_policies([ + policy.get(category) for policy in policies]) + cast(PolicyType, new_policy) + return new_policy + + +def _merge_policies(sources: List[CategoryType]) -> CategoryType: + """Merge a policy.""" + # When merging policies, the most permissive wins. + # This means we order it like this: + # True > Dict > None + # + # True: allow everything + # Dict: specify more granular permissions + # None: no opinion + # + # If there are multiple sources with a dict as policy, we recursively + # merge each key in the source. + + policy = None # type: CategoryType + seen = set() # type: Set[str] + for source in sources: + if source is None: + continue + + # A source that's True will always win. Shortcut return. + if source is True: + return True + + assert isinstance(source, dict) + + if policy is None: + policy = cast(CategoryType, {}) + + assert isinstance(policy, dict) + + for key in source: + if key in seen: + continue + seen.add(key) + + key_sources = [] + for src in sources: + if isinstance(src, dict): + key_sources.append(src.get(key)) + + policy[key] = _merge_policies(key_sources) + + return policy diff --git a/tests/auth/permissions/__init__.py b/tests/auth/permissions/__init__.py new file mode 100644 index 00000000000..dd0343dadc3 --- /dev/null +++ b/tests/auth/permissions/__init__.py @@ -0,0 +1 @@ +"""Tests for permissions.""" diff --git a/tests/auth/permissions/test_entities.py b/tests/auth/permissions/test_entities.py new file mode 100644 index 00000000000..33c164d12b4 --- /dev/null +++ b/tests/auth/permissions/test_entities.py @@ -0,0 +1,187 @@ +"""Tests for entity permissions.""" +import pytest +import voluptuous as vol + +from homeassistant.auth.permissions.entities import ( + compile_entities, ENTITY_POLICY_SCHEMA) + + +def test_entities_none(): + """Test entity ID policy.""" + policy = None + compiled = compile_entities(policy) + assert compiled('light.kitchen', ('read',)) is False + + +def test_entities_empty(): + """Test entity ID policy.""" + policy = {} + ENTITY_POLICY_SCHEMA(policy) + compiled = compile_entities(policy) + assert compiled('light.kitchen', ('read',)) is False + + +def test_entities_false(): + """Test entity ID policy.""" + policy = False + with pytest.raises(vol.Invalid): + ENTITY_POLICY_SCHEMA(policy) + + +def test_entities_true(): + """Test entity ID policy.""" + policy = True + ENTITY_POLICY_SCHEMA(policy) + compiled = compile_entities(policy) + assert compiled('light.kitchen', ('read',)) is True + + +def test_entities_domains_true(): + """Test entity ID policy.""" + policy = { + 'domains': True + } + ENTITY_POLICY_SCHEMA(policy) + compiled = compile_entities(policy) + assert compiled('light.kitchen', ('read',)) is True + + +def test_entities_domains_domain_true(): + """Test entity ID policy.""" + policy = { + 'domains': { + 'light': True + } + } + ENTITY_POLICY_SCHEMA(policy) + compiled = compile_entities(policy) + assert compiled('light.kitchen', ('read',)) is True + assert compiled('switch.kitchen', ('read',)) is False + + +def test_entities_domains_domain_false(): + """Test entity ID policy.""" + policy = { + 'domains': { + 'light': False + } + } + with pytest.raises(vol.Invalid): + ENTITY_POLICY_SCHEMA(policy) + + +def test_entities_entity_ids_true(): + """Test entity ID policy.""" + policy = { + 'entity_ids': True + } + ENTITY_POLICY_SCHEMA(policy) + compiled = compile_entities(policy) + assert compiled('light.kitchen', ('read',)) is True + + +def test_entities_entity_ids_false(): + """Test entity ID policy.""" + policy = { + 'entity_ids': False + } + with pytest.raises(vol.Invalid): + ENTITY_POLICY_SCHEMA(policy) + + +def test_entities_entity_ids_entity_id_true(): + """Test entity ID policy.""" + policy = { + 'entity_ids': { + 'light.kitchen': True + } + } + ENTITY_POLICY_SCHEMA(policy) + compiled = compile_entities(policy) + assert compiled('light.kitchen', ('read',)) is True + assert compiled('switch.kitchen', ('read',)) is False + + +def test_entities_entity_ids_entity_id_false(): + """Test entity ID policy.""" + policy = { + 'entity_ids': { + 'light.kitchen': False + } + } + with pytest.raises(vol.Invalid): + ENTITY_POLICY_SCHEMA(policy) + + +def test_entities_control_only(): + """Test policy granting control only.""" + policy = { + 'entity_ids': { + 'light.kitchen': { + 'read': True, + } + } + } + ENTITY_POLICY_SCHEMA(policy) + compiled = compile_entities(policy) + assert compiled('light.kitchen', ('read',)) is True + assert compiled('light.kitchen', ('control',)) is False + assert compiled('light.kitchen', ('edit',)) is False + + +def test_entities_read_control(): + """Test policy granting control only.""" + policy = { + 'domains': { + 'light': { + 'read': True, + 'control': True, + } + } + } + ENTITY_POLICY_SCHEMA(policy) + compiled = compile_entities(policy) + assert compiled('light.kitchen', ('read',)) is True + assert compiled('light.kitchen', ('control',)) is True + assert compiled('light.kitchen', ('edit',)) is False + + +def test_entities_all_allow(): + """Test policy allowing all entities.""" + policy = { + 'all': True + } + ENTITY_POLICY_SCHEMA(policy) + compiled = compile_entities(policy) + assert compiled('light.kitchen', ('read',)) is True + assert compiled('light.kitchen', ('control',)) is True + assert compiled('switch.kitchen', ('read',)) is True + + +def test_entities_all_read(): + """Test policy applying read to all entities.""" + policy = { + 'all': { + 'read': True + } + } + ENTITY_POLICY_SCHEMA(policy) + compiled = compile_entities(policy) + assert compiled('light.kitchen', ('read',)) is True + assert compiled('light.kitchen', ('control',)) is False + assert compiled('switch.kitchen', ('read',)) is True + + +def test_entities_all_control(): + """Test entity ID policy applying control to all.""" + policy = { + 'all': { + 'control': True + } + } + ENTITY_POLICY_SCHEMA(policy) + compiled = compile_entities(policy) + assert compiled('light.kitchen', ('read',)) is False + assert compiled('light.kitchen', ('control',)) is True + assert compiled('switch.kitchen', ('read',)) is False + assert compiled('switch.kitchen', ('control',)) is True diff --git a/tests/auth/permissions/test_init.py b/tests/auth/permissions/test_init.py new file mode 100644 index 00000000000..60ec3cb4314 --- /dev/null +++ b/tests/auth/permissions/test_init.py @@ -0,0 +1,46 @@ +"""Tests for the auth permission system.""" +from homeassistant.core import State +from homeassistant.auth import permissions + + +def test_policy_perm_filter_states(): + """Test filtering entitites.""" + states = [ + State('light.kitchen', 'on'), + State('light.living_room', 'off'), + State('light.balcony', 'on'), + ] + perm = permissions.PolicyPermissions({ + 'entities': { + 'entity_ids': { + 'light.kitchen': True, + 'light.balcony': True, + } + } + }) + filtered = perm.filter_states(states) + assert len(filtered) == 2 + assert filtered == [states[0], states[2]] + + +def test_owner_permissions(): + """Test owner permissions access all.""" + assert permissions.OwnerPermissions.check_entity('light.kitchen', 'write') + states = [ + State('light.kitchen', 'on'), + State('light.living_room', 'off'), + State('light.balcony', 'on'), + ] + assert permissions.OwnerPermissions.filter_states(states) == states + + +def test_default_policy_allow_all(): + """Test that the default policy is to allow all entity actions.""" + perm = permissions.PolicyPermissions(permissions.DEFAULT_POLICY) + assert perm.check_entity('light.kitchen', 'read') + states = [ + State('light.kitchen', 'on'), + State('light.living_room', 'off'), + State('light.balcony', 'on'), + ] + assert perm.filter_states(states) == states diff --git a/tests/auth/permissions/test_merge.py b/tests/auth/permissions/test_merge.py new file mode 100644 index 00000000000..901e027a146 --- /dev/null +++ b/tests/auth/permissions/test_merge.py @@ -0,0 +1,44 @@ +"""Tests for permissions merging.""" +from homeassistant.auth.permissions.merge import merge_policies + + +def test_merging_permissions_true_rules_dict(): + """Test merging policy with two entities.""" + policy1 = { + 'something_else': True, + 'entities': { + 'entity_ids': { + 'light.kitchen': True, + } + } + } + policy2 = { + 'entities': { + 'entity_ids': True + } + } + assert merge_policies([policy1, policy2]) == { + 'something_else': True, + 'entities': { + 'entity_ids': True + } + } + + +def test_merging_permissions_multiple_subcategories(): + """Test merging policy with two entities.""" + policy1 = { + 'entities': None + } + policy2 = { + 'entities': { + 'entity_ids': True, + } + } + policy3 = { + 'entities': True + } + assert merge_policies([policy1, policy2]) == policy2 + assert merge_policies([policy1, policy3]) == policy3 + + assert merge_policies([policy2, policy3]) == policy3 diff --git a/tests/auth/test_models.py b/tests/auth/test_models.py index c84bdc7390b..b02111e8d02 100644 --- a/tests/auth/test_models.py +++ b/tests/auth/test_models.py @@ -29,6 +29,6 @@ def test_permissions_merged(): # Make sure we cache instance assert user.permissions is user.permissions - assert user.permissions.check_entity('switch.bla') is True - assert user.permissions.check_entity('light.kitchen') is True - assert user.permissions.check_entity('light.not_kitchen') is False + assert user.permissions.check_entity('switch.bla', 'read') is True + assert user.permissions.check_entity('light.kitchen', 'read') is True + assert user.permissions.check_entity('light.not_kitchen', 'read') is False diff --git a/tests/auth/test_permissions.py b/tests/auth/test_permissions.py deleted file mode 100644 index 71582dc281d..00000000000 --- a/tests/auth/test_permissions.py +++ /dev/null @@ -1,198 +0,0 @@ -"""Tests for the auth permission system.""" -import pytest -import voluptuous as vol - -from homeassistant.core import State -from homeassistant.auth import permissions - - -def test_entities_none(): - """Test entity ID policy.""" - policy = None - compiled = permissions._compile_entities(policy) - assert compiled('light.kitchen', []) is False - - -def test_entities_empty(): - """Test entity ID policy.""" - policy = {} - permissions.ENTITY_POLICY_SCHEMA(policy) - compiled = permissions._compile_entities(policy) - assert compiled('light.kitchen', []) is False - - -def test_entities_false(): - """Test entity ID policy.""" - policy = False - with pytest.raises(vol.Invalid): - permissions.ENTITY_POLICY_SCHEMA(policy) - - -def test_entities_true(): - """Test entity ID policy.""" - policy = True - permissions.ENTITY_POLICY_SCHEMA(policy) - compiled = permissions._compile_entities(policy) - assert compiled('light.kitchen', []) is True - - -def test_entities_domains_true(): - """Test entity ID policy.""" - policy = { - 'domains': True - } - permissions.ENTITY_POLICY_SCHEMA(policy) - compiled = permissions._compile_entities(policy) - assert compiled('light.kitchen', []) is True - - -def test_entities_domains_domain_true(): - """Test entity ID policy.""" - policy = { - 'domains': { - 'light': True - } - } - permissions.ENTITY_POLICY_SCHEMA(policy) - compiled = permissions._compile_entities(policy) - assert compiled('light.kitchen', []) is True - assert compiled('switch.kitchen', []) is False - - -def test_entities_domains_domain_false(): - """Test entity ID policy.""" - policy = { - 'domains': { - 'light': False - } - } - with pytest.raises(vol.Invalid): - permissions.ENTITY_POLICY_SCHEMA(policy) - - -def test_entities_entity_ids_true(): - """Test entity ID policy.""" - policy = { - 'entity_ids': True - } - permissions.ENTITY_POLICY_SCHEMA(policy) - compiled = permissions._compile_entities(policy) - assert compiled('light.kitchen', []) is True - - -def test_entities_entity_ids_false(): - """Test entity ID policy.""" - policy = { - 'entity_ids': False - } - with pytest.raises(vol.Invalid): - permissions.ENTITY_POLICY_SCHEMA(policy) - - -def test_entities_entity_ids_entity_id_true(): - """Test entity ID policy.""" - policy = { - 'entity_ids': { - 'light.kitchen': True - } - } - permissions.ENTITY_POLICY_SCHEMA(policy) - compiled = permissions._compile_entities(policy) - assert compiled('light.kitchen', []) is True - assert compiled('switch.kitchen', []) is False - - -def test_entities_entity_ids_entity_id_false(): - """Test entity ID policy.""" - policy = { - 'entity_ids': { - 'light.kitchen': False - } - } - with pytest.raises(vol.Invalid): - permissions.ENTITY_POLICY_SCHEMA(policy) - - -def test_policy_perm_filter_states(): - """Test filtering entitites.""" - states = [ - State('light.kitchen', 'on'), - State('light.living_room', 'off'), - State('light.balcony', 'on'), - ] - perm = permissions.PolicyPermissions({ - 'entities': { - 'entity_ids': { - 'light.kitchen': True, - 'light.balcony': True, - } - } - }) - filtered = perm.filter_states(states) - assert len(filtered) == 2 - assert filtered == [states[0], states[2]] - - -def test_owner_permissions(): - """Test owner permissions access all.""" - assert permissions.OwnerPermissions.check_entity('light.kitchen') - states = [ - State('light.kitchen', 'on'), - State('light.living_room', 'off'), - State('light.balcony', 'on'), - ] - assert permissions.OwnerPermissions.filter_states(states) == states - - -def test_default_policy_allow_all(): - """Test that the default policy is to allow all entity actions.""" - perm = permissions.PolicyPermissions(permissions.DEFAULT_POLICY) - assert perm.check_entity('light.kitchen') - states = [ - State('light.kitchen', 'on'), - State('light.living_room', 'off'), - State('light.balcony', 'on'), - ] - assert perm.filter_states(states) == states - - -def test_merging_permissions_true_rules_dict(): - """Test merging policy with two entities.""" - policy1 = { - 'something_else': True, - 'entities': { - 'entity_ids': { - 'light.kitchen': True, - } - } - } - policy2 = { - 'entities': { - 'entity_ids': True - } - } - assert permissions.merge_policies([policy1, policy2]) == { - 'something_else': True, - 'entities': { - 'entity_ids': True - } - } - - -def test_merging_permissions_multiple_subcategories(): - """Test merging policy with two entities.""" - policy1 = { - 'entities': None - } - policy2 = { - 'entities': { - 'entity_ids': True, - } - } - policy3 = { - 'entities': True - } - assert permissions.merge_policies([policy1, policy2]) == policy2 - assert permissions.merge_policies([policy1, policy3]) == policy3 - - assert permissions.merge_policies([policy2, policy3]) == policy3