mirror of
https://github.com/home-assistant/core.git
synced 2025-04-24 01:08:12 +00:00
Permissions improv (#17811)
* Break up permissions file. * Granular entity permissions * Add "all" entity permission * Lint * Fix types
This commit is contained in:
parent
d1ef875132
commit
f4ac317d64
@ -1,252 +0,0 @@
|
||||
"""Permissions for Home Assistant."""
|
||||
from typing import ( # noqa: F401
|
||||
cast, Any, Callable, Dict, List, Mapping, Set, Tuple, Union)
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.core import State
|
||||
|
||||
CategoryType = Union[Mapping[str, 'CategoryType'], bool, None]
|
||||
PolicyType = Mapping[str, CategoryType]
|
||||
|
||||
|
||||
# Default policy if group has no policy applied.
|
||||
DEFAULT_POLICY = {
|
||||
"entities": True
|
||||
} # type: PolicyType
|
||||
|
||||
CAT_ENTITIES = 'entities'
|
||||
ENTITY_DOMAINS = 'domains'
|
||||
ENTITY_ENTITY_IDS = 'entity_ids'
|
||||
|
||||
VALUES_SCHEMA = vol.Any(True, vol.Schema({
|
||||
str: True
|
||||
}))
|
||||
|
||||
ENTITY_POLICY_SCHEMA = vol.Any(True, vol.Schema({
|
||||
vol.Optional(ENTITY_DOMAINS): VALUES_SCHEMA,
|
||||
vol.Optional(ENTITY_ENTITY_IDS): VALUES_SCHEMA,
|
||||
}))
|
||||
|
||||
POLICY_SCHEMA = vol.Schema({
|
||||
vol.Optional(CAT_ENTITIES): ENTITY_POLICY_SCHEMA
|
||||
})
|
||||
|
||||
|
||||
class AbstractPermissions:
|
||||
"""Default permissions class."""
|
||||
|
||||
def check_entity(self, entity_id: str, *keys: str) -> bool:
|
||||
"""Test if we can access entity."""
|
||||
raise NotImplementedError
|
||||
|
||||
def filter_states(self, states: List[State]) -> List[State]:
|
||||
"""Filter a list of states for what the user is allowed to see."""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class PolicyPermissions(AbstractPermissions):
|
||||
"""Handle permissions."""
|
||||
|
||||
def __init__(self, policy: PolicyType) -> None:
|
||||
"""Initialize the permission class."""
|
||||
self._policy = policy
|
||||
self._compiled = {} # type: Dict[str, Callable[..., bool]]
|
||||
|
||||
def check_entity(self, entity_id: str, *keys: str) -> bool:
|
||||
"""Test if we can access entity."""
|
||||
func = self._policy_func(CAT_ENTITIES, _compile_entities)
|
||||
return func(entity_id, keys)
|
||||
|
||||
def filter_states(self, states: List[State]) -> List[State]:
|
||||
"""Filter a list of states for what the user is allowed to see."""
|
||||
func = self._policy_func(CAT_ENTITIES, _compile_entities)
|
||||
keys = ('read',)
|
||||
return [entity for entity in states if func(entity.entity_id, keys)]
|
||||
|
||||
def _policy_func(self, category: str,
|
||||
compile_func: Callable[[CategoryType], Callable]) \
|
||||
-> Callable[..., bool]:
|
||||
"""Get a policy function."""
|
||||
func = self._compiled.get(category)
|
||||
|
||||
if func:
|
||||
return func
|
||||
|
||||
func = self._compiled[category] = compile_func(
|
||||
self._policy.get(category))
|
||||
return func
|
||||
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
"""Equals check."""
|
||||
# pylint: disable=protected-access
|
||||
return (isinstance(other, PolicyPermissions) and
|
||||
other._policy == self._policy)
|
||||
|
||||
|
||||
class _OwnerPermissions(AbstractPermissions):
|
||||
"""Owner permissions."""
|
||||
|
||||
# pylint: disable=no-self-use
|
||||
|
||||
def check_entity(self, entity_id: str, *keys: str) -> bool:
|
||||
"""Test if we can access entity."""
|
||||
return True
|
||||
|
||||
def filter_states(self, states: List[State]) -> List[State]:
|
||||
"""Filter a list of states for what the user is allowed to see."""
|
||||
return states
|
||||
|
||||
|
||||
OwnerPermissions = _OwnerPermissions() # pylint: disable=invalid-name
|
||||
|
||||
|
||||
def _compile_entities(policy: CategoryType) \
|
||||
-> Callable[[str, Tuple[str]], bool]:
|
||||
"""Compile policy into a function that tests policy."""
|
||||
# None, Empty Dict, False
|
||||
if not policy:
|
||||
def apply_policy_deny_all(entity_id: str, keys: Tuple[str]) -> bool:
|
||||
"""Decline all."""
|
||||
return False
|
||||
|
||||
return apply_policy_deny_all
|
||||
|
||||
if policy is True:
|
||||
def apply_policy_allow_all(entity_id: str, keys: Tuple[str]) -> bool:
|
||||
"""Approve all."""
|
||||
return True
|
||||
|
||||
return apply_policy_allow_all
|
||||
|
||||
assert isinstance(policy, dict)
|
||||
|
||||
domains = policy.get(ENTITY_DOMAINS)
|
||||
entity_ids = policy.get(ENTITY_ENTITY_IDS)
|
||||
|
||||
funcs = [] # type: List[Callable[[str, Tuple[str]], Union[None, bool]]]
|
||||
|
||||
# The order of these functions matter. The more precise are at the top.
|
||||
# If a function returns None, they cannot handle it.
|
||||
# If a function returns a boolean, that's the result to return.
|
||||
|
||||
# Setting entity_ids to a boolean is final decision for permissions
|
||||
# So return right away.
|
||||
if isinstance(entity_ids, bool):
|
||||
def apply_entity_id_policy(entity_id: str, keys: Tuple[str]) -> bool:
|
||||
"""Test if allowed entity_id."""
|
||||
return entity_ids # type: ignore
|
||||
|
||||
return apply_entity_id_policy
|
||||
|
||||
if entity_ids is not None:
|
||||
def allowed_entity_id(entity_id: str, keys: Tuple[str]) \
|
||||
-> Union[None, bool]:
|
||||
"""Test if allowed entity_id."""
|
||||
return entity_ids.get(entity_id) # type: ignore
|
||||
|
||||
funcs.append(allowed_entity_id)
|
||||
|
||||
if isinstance(domains, bool):
|
||||
def allowed_domain(entity_id: str, keys: Tuple[str]) \
|
||||
-> Union[None, bool]:
|
||||
"""Test if allowed domain."""
|
||||
return domains
|
||||
|
||||
funcs.append(allowed_domain)
|
||||
|
||||
elif domains is not None:
|
||||
def allowed_domain(entity_id: str, keys: Tuple[str]) \
|
||||
-> Union[None, bool]:
|
||||
"""Test if allowed domain."""
|
||||
domain = entity_id.split(".", 1)[0]
|
||||
return domains.get(domain) # type: ignore
|
||||
|
||||
funcs.append(allowed_domain)
|
||||
|
||||
# Can happen if no valid subcategories specified
|
||||
if not funcs:
|
||||
def apply_policy_deny_all_2(entity_id: str, keys: Tuple[str]) -> bool:
|
||||
"""Decline all."""
|
||||
return False
|
||||
|
||||
return apply_policy_deny_all_2
|
||||
|
||||
if len(funcs) == 1:
|
||||
func = funcs[0]
|
||||
|
||||
def apply_policy_func(entity_id: str, keys: Tuple[str]) -> bool:
|
||||
"""Apply a single policy function."""
|
||||
return func(entity_id, keys) is True
|
||||
|
||||
return apply_policy_func
|
||||
|
||||
def apply_policy_funcs(entity_id: str, keys: Tuple[str]) -> bool:
|
||||
"""Apply several policy functions."""
|
||||
for func in funcs:
|
||||
result = func(entity_id, keys)
|
||||
if result is not None:
|
||||
return result
|
||||
return False
|
||||
|
||||
return apply_policy_funcs
|
||||
|
||||
|
||||
def merge_policies(policies: List[PolicyType]) -> PolicyType:
|
||||
"""Merge policies."""
|
||||
new_policy = {} # type: Dict[str, CategoryType]
|
||||
seen = set() # type: Set[str]
|
||||
for policy in policies:
|
||||
for category in policy:
|
||||
if category in seen:
|
||||
continue
|
||||
seen.add(category)
|
||||
new_policy[category] = _merge_policies([
|
||||
policy.get(category) for policy in policies])
|
||||
cast(PolicyType, new_policy)
|
||||
return new_policy
|
||||
|
||||
|
||||
def _merge_policies(sources: List[CategoryType]) -> CategoryType:
|
||||
"""Merge a policy."""
|
||||
# When merging policies, the most permissive wins.
|
||||
# This means we order it like this:
|
||||
# True > Dict > None
|
||||
#
|
||||
# True: allow everything
|
||||
# Dict: specify more granular permissions
|
||||
# None: no opinion
|
||||
#
|
||||
# If there are multiple sources with a dict as policy, we recursively
|
||||
# merge each key in the source.
|
||||
|
||||
policy = None # type: CategoryType
|
||||
seen = set() # type: Set[str]
|
||||
for source in sources:
|
||||
if source is None:
|
||||
continue
|
||||
|
||||
# A source that's True will always win. Shortcut return.
|
||||
if source is True:
|
||||
return True
|
||||
|
||||
assert isinstance(source, dict)
|
||||
|
||||
if policy is None:
|
||||
policy = {}
|
||||
|
||||
assert isinstance(policy, dict)
|
||||
|
||||
for key in source:
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
|
||||
key_sources = []
|
||||
for src in sources:
|
||||
if isinstance(src, dict):
|
||||
key_sources.append(src.get(key))
|
||||
|
||||
policy[key] = _merge_policies(key_sources)
|
||||
|
||||
return policy
|
97
homeassistant/auth/permissions/__init__.py
Normal file
97
homeassistant/auth/permissions/__init__.py
Normal file
@ -0,0 +1,97 @@
|
||||
"""Permissions for Home Assistant."""
|
||||
import logging
|
||||
from typing import ( # noqa: F401
|
||||
cast, Any, Callable, Dict, List, Mapping, Set, Tuple, Union)
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.core import State
|
||||
|
||||
from .common import CategoryType, PolicyType
|
||||
from .entities import ENTITY_POLICY_SCHEMA, compile_entities
|
||||
from .merge import merge_policies # noqa
|
||||
|
||||
|
||||
# Default policy if group has no policy applied.
|
||||
DEFAULT_POLICY = {
|
||||
"entities": True
|
||||
} # type: PolicyType
|
||||
|
||||
CAT_ENTITIES = 'entities'
|
||||
|
||||
POLICY_SCHEMA = vol.Schema({
|
||||
vol.Optional(CAT_ENTITIES): ENTITY_POLICY_SCHEMA
|
||||
})
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AbstractPermissions:
|
||||
"""Default permissions class."""
|
||||
|
||||
def check_entity(self, entity_id: str, key: str) -> bool:
|
||||
"""Test if we can access entity."""
|
||||
raise NotImplementedError
|
||||
|
||||
def filter_states(self, states: List[State]) -> List[State]:
|
||||
"""Filter a list of states for what the user is allowed to see."""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class PolicyPermissions(AbstractPermissions):
|
||||
"""Handle permissions."""
|
||||
|
||||
def __init__(self, policy: PolicyType) -> None:
|
||||
"""Initialize the permission class."""
|
||||
self._policy = policy
|
||||
self._compiled = {} # type: Dict[str, Callable[..., bool]]
|
||||
|
||||
def check_entity(self, entity_id: str, key: str) -> bool:
|
||||
"""Test if we can access entity."""
|
||||
func = self._policy_func(CAT_ENTITIES, compile_entities)
|
||||
return func(entity_id, (key,))
|
||||
|
||||
def filter_states(self, states: List[State]) -> List[State]:
|
||||
"""Filter a list of states for what the user is allowed to see."""
|
||||
func = self._policy_func(CAT_ENTITIES, compile_entities)
|
||||
keys = ('read',)
|
||||
return [entity for entity in states if func(entity.entity_id, keys)]
|
||||
|
||||
def _policy_func(self, category: str,
|
||||
compile_func: Callable[[CategoryType], Callable]) \
|
||||
-> Callable[..., bool]:
|
||||
"""Get a policy function."""
|
||||
func = self._compiled.get(category)
|
||||
|
||||
if func:
|
||||
return func
|
||||
|
||||
func = self._compiled[category] = compile_func(
|
||||
self._policy.get(category))
|
||||
|
||||
_LOGGER.debug("Compiled %s func: %s", category, func)
|
||||
|
||||
return func
|
||||
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
"""Equals check."""
|
||||
# pylint: disable=protected-access
|
||||
return (isinstance(other, PolicyPermissions) and
|
||||
other._policy == self._policy)
|
||||
|
||||
|
||||
class _OwnerPermissions(AbstractPermissions):
|
||||
"""Owner permissions."""
|
||||
|
||||
# pylint: disable=no-self-use
|
||||
|
||||
def check_entity(self, entity_id: str, key: str) -> bool:
|
||||
"""Test if we can access entity."""
|
||||
return True
|
||||
|
||||
def filter_states(self, states: List[State]) -> List[State]:
|
||||
"""Filter a list of states for what the user is allowed to see."""
|
||||
return states
|
||||
|
||||
|
||||
OwnerPermissions = _OwnerPermissions() # pylint: disable=invalid-name
|
33
homeassistant/auth/permissions/common.py
Normal file
33
homeassistant/auth/permissions/common.py
Normal file
@ -0,0 +1,33 @@
|
||||
"""Common code for permissions."""
|
||||
from typing import ( # noqa: F401
|
||||
Mapping, Union, Any)
|
||||
|
||||
# MyPy doesn't support recursion yet. So writing it out as far as we need.
|
||||
|
||||
ValueType = Union[
|
||||
# Example: entities.all = { read: true, control: true }
|
||||
Mapping[str, bool],
|
||||
bool,
|
||||
None
|
||||
]
|
||||
|
||||
SubCategoryType = Union[
|
||||
# Example: entities.domains = { light: … }
|
||||
Mapping[str, ValueType],
|
||||
bool,
|
||||
None
|
||||
]
|
||||
|
||||
CategoryType = Union[
|
||||
# Example: entities.domains
|
||||
Mapping[str, SubCategoryType],
|
||||
# Example: entities.all
|
||||
Mapping[str, ValueType],
|
||||
bool,
|
||||
None
|
||||
]
|
||||
|
||||
# Example: { entities: … }
|
||||
PolicyType = Mapping[str, CategoryType]
|
||||
|
||||
SUBCAT_ALL = 'all'
|
149
homeassistant/auth/permissions/entities.py
Normal file
149
homeassistant/auth/permissions/entities.py
Normal file
@ -0,0 +1,149 @@
|
||||
"""Entity permissions."""
|
||||
from functools import wraps
|
||||
from typing import ( # noqa: F401
|
||||
Callable, Dict, List, Tuple, Union)
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from .common import CategoryType, ValueType, SUBCAT_ALL
|
||||
|
||||
|
||||
POLICY_READ = 'read'
|
||||
POLICY_CONTROL = 'control'
|
||||
POLICY_EDIT = 'edit'
|
||||
|
||||
SINGLE_ENTITY_SCHEMA = vol.Any(True, vol.Schema({
|
||||
vol.Optional(POLICY_READ): True,
|
||||
vol.Optional(POLICY_CONTROL): True,
|
||||
vol.Optional(POLICY_EDIT): True,
|
||||
}))
|
||||
|
||||
ENTITY_DOMAINS = 'domains'
|
||||
ENTITY_ENTITY_IDS = 'entity_ids'
|
||||
|
||||
ENTITY_VALUES_SCHEMA = vol.Any(True, vol.Schema({
|
||||
str: SINGLE_ENTITY_SCHEMA
|
||||
}))
|
||||
|
||||
ENTITY_POLICY_SCHEMA = vol.Any(True, vol.Schema({
|
||||
vol.Optional(SUBCAT_ALL): SINGLE_ENTITY_SCHEMA,
|
||||
vol.Optional(ENTITY_DOMAINS): ENTITY_VALUES_SCHEMA,
|
||||
vol.Optional(ENTITY_ENTITY_IDS): ENTITY_VALUES_SCHEMA,
|
||||
}))
|
||||
|
||||
|
||||
def _entity_allowed(schema: ValueType, keys: Tuple[str]) \
|
||||
-> Union[bool, None]:
|
||||
"""Test if an entity is allowed based on the keys."""
|
||||
if schema is None or isinstance(schema, bool):
|
||||
return schema
|
||||
assert isinstance(schema, dict)
|
||||
return schema.get(keys[0])
|
||||
|
||||
|
||||
def compile_entities(policy: CategoryType) \
|
||||
-> Callable[[str, Tuple[str]], bool]:
|
||||
"""Compile policy into a function that tests policy."""
|
||||
# None, Empty Dict, False
|
||||
if not policy:
|
||||
def apply_policy_deny_all(entity_id: str, keys: Tuple[str]) -> bool:
|
||||
"""Decline all."""
|
||||
return False
|
||||
|
||||
return apply_policy_deny_all
|
||||
|
||||
if policy is True:
|
||||
def apply_policy_allow_all(entity_id: str, keys: Tuple[str]) -> bool:
|
||||
"""Approve all."""
|
||||
return True
|
||||
|
||||
return apply_policy_allow_all
|
||||
|
||||
assert isinstance(policy, dict)
|
||||
|
||||
domains = policy.get(ENTITY_DOMAINS)
|
||||
entity_ids = policy.get(ENTITY_ENTITY_IDS)
|
||||
all_entities = policy.get(SUBCAT_ALL)
|
||||
|
||||
funcs = [] # type: List[Callable[[str, Tuple[str]], Union[None, bool]]]
|
||||
|
||||
# The order of these functions matter. The more precise are at the top.
|
||||
# If a function returns None, they cannot handle it.
|
||||
# If a function returns a boolean, that's the result to return.
|
||||
|
||||
# Setting entity_ids to a boolean is final decision for permissions
|
||||
# So return right away.
|
||||
if isinstance(entity_ids, bool):
|
||||
def allowed_entity_id_bool(entity_id: str, keys: Tuple[str]) -> bool:
|
||||
"""Test if allowed entity_id."""
|
||||
return entity_ids # type: ignore
|
||||
|
||||
return allowed_entity_id_bool
|
||||
|
||||
if entity_ids is not None:
|
||||
def allowed_entity_id_dict(entity_id: str, keys: Tuple[str]) \
|
||||
-> Union[None, bool]:
|
||||
"""Test if allowed entity_id."""
|
||||
return _entity_allowed(
|
||||
entity_ids.get(entity_id), keys) # type: ignore
|
||||
|
||||
funcs.append(allowed_entity_id_dict)
|
||||
|
||||
if isinstance(domains, bool):
|
||||
def allowed_domain_bool(entity_id: str, keys: Tuple[str]) \
|
||||
-> Union[None, bool]:
|
||||
"""Test if allowed domain."""
|
||||
return domains
|
||||
|
||||
funcs.append(allowed_domain_bool)
|
||||
|
||||
elif domains is not None:
|
||||
def allowed_domain_dict(entity_id: str, keys: Tuple[str]) \
|
||||
-> Union[None, bool]:
|
||||
"""Test if allowed domain."""
|
||||
domain = entity_id.split(".", 1)[0]
|
||||
return _entity_allowed(domains.get(domain), keys) # type: ignore
|
||||
|
||||
funcs.append(allowed_domain_dict)
|
||||
|
||||
if isinstance(all_entities, bool):
|
||||
def allowed_all_entities_bool(entity_id: str, keys: Tuple[str]) \
|
||||
-> Union[None, bool]:
|
||||
"""Test if allowed domain."""
|
||||
return all_entities
|
||||
funcs.append(allowed_all_entities_bool)
|
||||
|
||||
elif all_entities is not None:
|
||||
def allowed_all_entities_dict(entity_id: str, keys: Tuple[str]) \
|
||||
-> Union[None, bool]:
|
||||
"""Test if allowed domain."""
|
||||
return _entity_allowed(all_entities, keys)
|
||||
funcs.append(allowed_all_entities_dict)
|
||||
|
||||
# Can happen if no valid subcategories specified
|
||||
if not funcs:
|
||||
def apply_policy_deny_all_2(entity_id: str, keys: Tuple[str]) -> bool:
|
||||
"""Decline all."""
|
||||
return False
|
||||
|
||||
return apply_policy_deny_all_2
|
||||
|
||||
if len(funcs) == 1:
|
||||
func = funcs[0]
|
||||
|
||||
@wraps(func)
|
||||
def apply_policy_func(entity_id: str, keys: Tuple[str]) -> bool:
|
||||
"""Apply a single policy function."""
|
||||
return func(entity_id, keys) is True
|
||||
|
||||
return apply_policy_func
|
||||
|
||||
def apply_policy_funcs(entity_id: str, keys: Tuple[str]) -> bool:
|
||||
"""Apply several policy functions."""
|
||||
for func in funcs:
|
||||
result = func(entity_id, keys)
|
||||
if result is not None:
|
||||
return result
|
||||
return False
|
||||
|
||||
return apply_policy_funcs
|
65
homeassistant/auth/permissions/merge.py
Normal file
65
homeassistant/auth/permissions/merge.py
Normal file
@ -0,0 +1,65 @@
|
||||
"""Merging of policies."""
|
||||
from typing import ( # noqa: F401
|
||||
cast, Dict, List, Set)
|
||||
|
||||
from .common import PolicyType, CategoryType
|
||||
|
||||
|
||||
def merge_policies(policies: List[PolicyType]) -> PolicyType:
|
||||
"""Merge policies."""
|
||||
new_policy = {} # type: Dict[str, CategoryType]
|
||||
seen = set() # type: Set[str]
|
||||
for policy in policies:
|
||||
for category in policy:
|
||||
if category in seen:
|
||||
continue
|
||||
seen.add(category)
|
||||
new_policy[category] = _merge_policies([
|
||||
policy.get(category) for policy in policies])
|
||||
cast(PolicyType, new_policy)
|
||||
return new_policy
|
||||
|
||||
|
||||
def _merge_policies(sources: List[CategoryType]) -> CategoryType:
|
||||
"""Merge a policy."""
|
||||
# When merging policies, the most permissive wins.
|
||||
# This means we order it like this:
|
||||
# True > Dict > None
|
||||
#
|
||||
# True: allow everything
|
||||
# Dict: specify more granular permissions
|
||||
# None: no opinion
|
||||
#
|
||||
# If there are multiple sources with a dict as policy, we recursively
|
||||
# merge each key in the source.
|
||||
|
||||
policy = None # type: CategoryType
|
||||
seen = set() # type: Set[str]
|
||||
for source in sources:
|
||||
if source is None:
|
||||
continue
|
||||
|
||||
# A source that's True will always win. Shortcut return.
|
||||
if source is True:
|
||||
return True
|
||||
|
||||
assert isinstance(source, dict)
|
||||
|
||||
if policy is None:
|
||||
policy = cast(CategoryType, {})
|
||||
|
||||
assert isinstance(policy, dict)
|
||||
|
||||
for key in source:
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
|
||||
key_sources = []
|
||||
for src in sources:
|
||||
if isinstance(src, dict):
|
||||
key_sources.append(src.get(key))
|
||||
|
||||
policy[key] = _merge_policies(key_sources)
|
||||
|
||||
return policy
|
1
tests/auth/permissions/__init__.py
Normal file
1
tests/auth/permissions/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
"""Tests for permissions."""
|
187
tests/auth/permissions/test_entities.py
Normal file
187
tests/auth/permissions/test_entities.py
Normal file
@ -0,0 +1,187 @@
|
||||
"""Tests for entity permissions."""
|
||||
import pytest
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.auth.permissions.entities import (
|
||||
compile_entities, ENTITY_POLICY_SCHEMA)
|
||||
|
||||
|
||||
def test_entities_none():
|
||||
"""Test entity ID policy."""
|
||||
policy = None
|
||||
compiled = compile_entities(policy)
|
||||
assert compiled('light.kitchen', ('read',)) is False
|
||||
|
||||
|
||||
def test_entities_empty():
|
||||
"""Test entity ID policy."""
|
||||
policy = {}
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = compile_entities(policy)
|
||||
assert compiled('light.kitchen', ('read',)) is False
|
||||
|
||||
|
||||
def test_entities_false():
|
||||
"""Test entity ID policy."""
|
||||
policy = False
|
||||
with pytest.raises(vol.Invalid):
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
|
||||
|
||||
def test_entities_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = True
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = compile_entities(policy)
|
||||
assert compiled('light.kitchen', ('read',)) is True
|
||||
|
||||
|
||||
def test_entities_domains_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'domains': True
|
||||
}
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = compile_entities(policy)
|
||||
assert compiled('light.kitchen', ('read',)) is True
|
||||
|
||||
|
||||
def test_entities_domains_domain_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'domains': {
|
||||
'light': True
|
||||
}
|
||||
}
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = compile_entities(policy)
|
||||
assert compiled('light.kitchen', ('read',)) is True
|
||||
assert compiled('switch.kitchen', ('read',)) is False
|
||||
|
||||
|
||||
def test_entities_domains_domain_false():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'domains': {
|
||||
'light': False
|
||||
}
|
||||
}
|
||||
with pytest.raises(vol.Invalid):
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
|
||||
|
||||
def test_entities_entity_ids_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'entity_ids': True
|
||||
}
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = compile_entities(policy)
|
||||
assert compiled('light.kitchen', ('read',)) is True
|
||||
|
||||
|
||||
def test_entities_entity_ids_false():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'entity_ids': False
|
||||
}
|
||||
with pytest.raises(vol.Invalid):
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
|
||||
|
||||
def test_entities_entity_ids_entity_id_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'entity_ids': {
|
||||
'light.kitchen': True
|
||||
}
|
||||
}
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = compile_entities(policy)
|
||||
assert compiled('light.kitchen', ('read',)) is True
|
||||
assert compiled('switch.kitchen', ('read',)) is False
|
||||
|
||||
|
||||
def test_entities_entity_ids_entity_id_false():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'entity_ids': {
|
||||
'light.kitchen': False
|
||||
}
|
||||
}
|
||||
with pytest.raises(vol.Invalid):
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
|
||||
|
||||
def test_entities_control_only():
|
||||
"""Test policy granting control only."""
|
||||
policy = {
|
||||
'entity_ids': {
|
||||
'light.kitchen': {
|
||||
'read': True,
|
||||
}
|
||||
}
|
||||
}
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = compile_entities(policy)
|
||||
assert compiled('light.kitchen', ('read',)) is True
|
||||
assert compiled('light.kitchen', ('control',)) is False
|
||||
assert compiled('light.kitchen', ('edit',)) is False
|
||||
|
||||
|
||||
def test_entities_read_control():
|
||||
"""Test policy granting control only."""
|
||||
policy = {
|
||||
'domains': {
|
||||
'light': {
|
||||
'read': True,
|
||||
'control': True,
|
||||
}
|
||||
}
|
||||
}
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = compile_entities(policy)
|
||||
assert compiled('light.kitchen', ('read',)) is True
|
||||
assert compiled('light.kitchen', ('control',)) is True
|
||||
assert compiled('light.kitchen', ('edit',)) is False
|
||||
|
||||
|
||||
def test_entities_all_allow():
|
||||
"""Test policy allowing all entities."""
|
||||
policy = {
|
||||
'all': True
|
||||
}
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = compile_entities(policy)
|
||||
assert compiled('light.kitchen', ('read',)) is True
|
||||
assert compiled('light.kitchen', ('control',)) is True
|
||||
assert compiled('switch.kitchen', ('read',)) is True
|
||||
|
||||
|
||||
def test_entities_all_read():
|
||||
"""Test policy applying read to all entities."""
|
||||
policy = {
|
||||
'all': {
|
||||
'read': True
|
||||
}
|
||||
}
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = compile_entities(policy)
|
||||
assert compiled('light.kitchen', ('read',)) is True
|
||||
assert compiled('light.kitchen', ('control',)) is False
|
||||
assert compiled('switch.kitchen', ('read',)) is True
|
||||
|
||||
|
||||
def test_entities_all_control():
|
||||
"""Test entity ID policy applying control to all."""
|
||||
policy = {
|
||||
'all': {
|
||||
'control': True
|
||||
}
|
||||
}
|
||||
ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = compile_entities(policy)
|
||||
assert compiled('light.kitchen', ('read',)) is False
|
||||
assert compiled('light.kitchen', ('control',)) is True
|
||||
assert compiled('switch.kitchen', ('read',)) is False
|
||||
assert compiled('switch.kitchen', ('control',)) is True
|
46
tests/auth/permissions/test_init.py
Normal file
46
tests/auth/permissions/test_init.py
Normal file
@ -0,0 +1,46 @@
|
||||
"""Tests for the auth permission system."""
|
||||
from homeassistant.core import State
|
||||
from homeassistant.auth import permissions
|
||||
|
||||
|
||||
def test_policy_perm_filter_states():
|
||||
"""Test filtering entitites."""
|
||||
states = [
|
||||
State('light.kitchen', 'on'),
|
||||
State('light.living_room', 'off'),
|
||||
State('light.balcony', 'on'),
|
||||
]
|
||||
perm = permissions.PolicyPermissions({
|
||||
'entities': {
|
||||
'entity_ids': {
|
||||
'light.kitchen': True,
|
||||
'light.balcony': True,
|
||||
}
|
||||
}
|
||||
})
|
||||
filtered = perm.filter_states(states)
|
||||
assert len(filtered) == 2
|
||||
assert filtered == [states[0], states[2]]
|
||||
|
||||
|
||||
def test_owner_permissions():
|
||||
"""Test owner permissions access all."""
|
||||
assert permissions.OwnerPermissions.check_entity('light.kitchen', 'write')
|
||||
states = [
|
||||
State('light.kitchen', 'on'),
|
||||
State('light.living_room', 'off'),
|
||||
State('light.balcony', 'on'),
|
||||
]
|
||||
assert permissions.OwnerPermissions.filter_states(states) == states
|
||||
|
||||
|
||||
def test_default_policy_allow_all():
|
||||
"""Test that the default policy is to allow all entity actions."""
|
||||
perm = permissions.PolicyPermissions(permissions.DEFAULT_POLICY)
|
||||
assert perm.check_entity('light.kitchen', 'read')
|
||||
states = [
|
||||
State('light.kitchen', 'on'),
|
||||
State('light.living_room', 'off'),
|
||||
State('light.balcony', 'on'),
|
||||
]
|
||||
assert perm.filter_states(states) == states
|
44
tests/auth/permissions/test_merge.py
Normal file
44
tests/auth/permissions/test_merge.py
Normal file
@ -0,0 +1,44 @@
|
||||
"""Tests for permissions merging."""
|
||||
from homeassistant.auth.permissions.merge import merge_policies
|
||||
|
||||
|
||||
def test_merging_permissions_true_rules_dict():
|
||||
"""Test merging policy with two entities."""
|
||||
policy1 = {
|
||||
'something_else': True,
|
||||
'entities': {
|
||||
'entity_ids': {
|
||||
'light.kitchen': True,
|
||||
}
|
||||
}
|
||||
}
|
||||
policy2 = {
|
||||
'entities': {
|
||||
'entity_ids': True
|
||||
}
|
||||
}
|
||||
assert merge_policies([policy1, policy2]) == {
|
||||
'something_else': True,
|
||||
'entities': {
|
||||
'entity_ids': True
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def test_merging_permissions_multiple_subcategories():
|
||||
"""Test merging policy with two entities."""
|
||||
policy1 = {
|
||||
'entities': None
|
||||
}
|
||||
policy2 = {
|
||||
'entities': {
|
||||
'entity_ids': True,
|
||||
}
|
||||
}
|
||||
policy3 = {
|
||||
'entities': True
|
||||
}
|
||||
assert merge_policies([policy1, policy2]) == policy2
|
||||
assert merge_policies([policy1, policy3]) == policy3
|
||||
|
||||
assert merge_policies([policy2, policy3]) == policy3
|
@ -29,6 +29,6 @@ def test_permissions_merged():
|
||||
# Make sure we cache instance
|
||||
assert user.permissions is user.permissions
|
||||
|
||||
assert user.permissions.check_entity('switch.bla') is True
|
||||
assert user.permissions.check_entity('light.kitchen') is True
|
||||
assert user.permissions.check_entity('light.not_kitchen') is False
|
||||
assert user.permissions.check_entity('switch.bla', 'read') is True
|
||||
assert user.permissions.check_entity('light.kitchen', 'read') is True
|
||||
assert user.permissions.check_entity('light.not_kitchen', 'read') is False
|
||||
|
@ -1,198 +0,0 @@
|
||||
"""Tests for the auth permission system."""
|
||||
import pytest
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.core import State
|
||||
from homeassistant.auth import permissions
|
||||
|
||||
|
||||
def test_entities_none():
|
||||
"""Test entity ID policy."""
|
||||
policy = None
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert compiled('light.kitchen', []) is False
|
||||
|
||||
|
||||
def test_entities_empty():
|
||||
"""Test entity ID policy."""
|
||||
policy = {}
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert compiled('light.kitchen', []) is False
|
||||
|
||||
|
||||
def test_entities_false():
|
||||
"""Test entity ID policy."""
|
||||
policy = False
|
||||
with pytest.raises(vol.Invalid):
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
|
||||
|
||||
def test_entities_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = True
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert compiled('light.kitchen', []) is True
|
||||
|
||||
|
||||
def test_entities_domains_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'domains': True
|
||||
}
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert compiled('light.kitchen', []) is True
|
||||
|
||||
|
||||
def test_entities_domains_domain_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'domains': {
|
||||
'light': True
|
||||
}
|
||||
}
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert compiled('light.kitchen', []) is True
|
||||
assert compiled('switch.kitchen', []) is False
|
||||
|
||||
|
||||
def test_entities_domains_domain_false():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'domains': {
|
||||
'light': False
|
||||
}
|
||||
}
|
||||
with pytest.raises(vol.Invalid):
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
|
||||
|
||||
def test_entities_entity_ids_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'entity_ids': True
|
||||
}
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert compiled('light.kitchen', []) is True
|
||||
|
||||
|
||||
def test_entities_entity_ids_false():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'entity_ids': False
|
||||
}
|
||||
with pytest.raises(vol.Invalid):
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
|
||||
|
||||
def test_entities_entity_ids_entity_id_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'entity_ids': {
|
||||
'light.kitchen': True
|
||||
}
|
||||
}
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert compiled('light.kitchen', []) is True
|
||||
assert compiled('switch.kitchen', []) is False
|
||||
|
||||
|
||||
def test_entities_entity_ids_entity_id_false():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'entity_ids': {
|
||||
'light.kitchen': False
|
||||
}
|
||||
}
|
||||
with pytest.raises(vol.Invalid):
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
|
||||
|
||||
def test_policy_perm_filter_states():
|
||||
"""Test filtering entitites."""
|
||||
states = [
|
||||
State('light.kitchen', 'on'),
|
||||
State('light.living_room', 'off'),
|
||||
State('light.balcony', 'on'),
|
||||
]
|
||||
perm = permissions.PolicyPermissions({
|
||||
'entities': {
|
||||
'entity_ids': {
|
||||
'light.kitchen': True,
|
||||
'light.balcony': True,
|
||||
}
|
||||
}
|
||||
})
|
||||
filtered = perm.filter_states(states)
|
||||
assert len(filtered) == 2
|
||||
assert filtered == [states[0], states[2]]
|
||||
|
||||
|
||||
def test_owner_permissions():
|
||||
"""Test owner permissions access all."""
|
||||
assert permissions.OwnerPermissions.check_entity('light.kitchen')
|
||||
states = [
|
||||
State('light.kitchen', 'on'),
|
||||
State('light.living_room', 'off'),
|
||||
State('light.balcony', 'on'),
|
||||
]
|
||||
assert permissions.OwnerPermissions.filter_states(states) == states
|
||||
|
||||
|
||||
def test_default_policy_allow_all():
|
||||
"""Test that the default policy is to allow all entity actions."""
|
||||
perm = permissions.PolicyPermissions(permissions.DEFAULT_POLICY)
|
||||
assert perm.check_entity('light.kitchen')
|
||||
states = [
|
||||
State('light.kitchen', 'on'),
|
||||
State('light.living_room', 'off'),
|
||||
State('light.balcony', 'on'),
|
||||
]
|
||||
assert perm.filter_states(states) == states
|
||||
|
||||
|
||||
def test_merging_permissions_true_rules_dict():
|
||||
"""Test merging policy with two entities."""
|
||||
policy1 = {
|
||||
'something_else': True,
|
||||
'entities': {
|
||||
'entity_ids': {
|
||||
'light.kitchen': True,
|
||||
}
|
||||
}
|
||||
}
|
||||
policy2 = {
|
||||
'entities': {
|
||||
'entity_ids': True
|
||||
}
|
||||
}
|
||||
assert permissions.merge_policies([policy1, policy2]) == {
|
||||
'something_else': True,
|
||||
'entities': {
|
||||
'entity_ids': True
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def test_merging_permissions_multiple_subcategories():
|
||||
"""Test merging policy with two entities."""
|
||||
policy1 = {
|
||||
'entities': None
|
||||
}
|
||||
policy2 = {
|
||||
'entities': {
|
||||
'entity_ids': True,
|
||||
}
|
||||
}
|
||||
policy3 = {
|
||||
'entities': True
|
||||
}
|
||||
assert permissions.merge_policies([policy1, policy2]) == policy2
|
||||
assert permissions.merge_policies([policy1, policy3]) == policy3
|
||||
|
||||
assert permissions.merge_policies([policy2, policy3]) == policy3
|
Loading…
x
Reference in New Issue
Block a user