mirror of
https://github.com/home-assistant/core.git
synced 2025-07-17 18:27:09 +00:00
Pre-split unifiprotect nested attribute lookups (#96862)
* Pre-split unifiprotect nested attribute lookups replaces and closes #96631 * Pre-split unifiprotect nested attribute lookups replaces and closes #96631 * comments
This commit is contained in:
parent
1c19c54e38
commit
660c95d784
@ -5,7 +5,7 @@ from collections.abc import Callable, Coroutine
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
import logging
|
import logging
|
||||||
from typing import Any, Generic, TypeVar, cast
|
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast
|
||||||
|
|
||||||
from pyunifiprotect.data import NVR, Event, ProtectAdoptableDeviceModel
|
from pyunifiprotect.data import NVR, Event, ProtectAdoptableDeviceModel
|
||||||
|
|
||||||
@ -19,6 +19,15 @@ _LOGGER = logging.getLogger(__name__)
|
|||||||
T = TypeVar("T", bound=ProtectAdoptableDeviceModel | NVR)
|
T = TypeVar("T", bound=ProtectAdoptableDeviceModel | NVR)
|
||||||
|
|
||||||
|
|
||||||
|
def split_tuple(value: tuple[str, ...] | str | None) -> tuple[str, ...] | None:
|
||||||
|
"""Split string to tuple."""
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
assert isinstance(value, str)
|
||||||
|
return tuple(value.split("."))
|
||||||
|
|
||||||
|
|
||||||
class PermRequired(int, Enum):
|
class PermRequired(int, Enum):
|
||||||
"""Type of permission level required for entity."""
|
"""Type of permission level required for entity."""
|
||||||
|
|
||||||
@ -31,18 +40,34 @@ class PermRequired(int, Enum):
|
|||||||
class ProtectRequiredKeysMixin(EntityDescription, Generic[T]):
|
class ProtectRequiredKeysMixin(EntityDescription, Generic[T]):
|
||||||
"""Mixin for required keys."""
|
"""Mixin for required keys."""
|
||||||
|
|
||||||
ufp_required_field: str | None = None
|
# `ufp_required_field`, `ufp_value`, and `ufp_enabled` are defined as
|
||||||
ufp_value: str | None = None
|
# a `str` in the dataclass, but `__post_init__` converts it to a
|
||||||
|
# `tuple[str, ...]` to avoid doing it at run time in `get_nested_attr`
|
||||||
|
# which is usually called millions of times per day.
|
||||||
|
ufp_required_field: tuple[str, ...] | str | None = None
|
||||||
|
ufp_value: tuple[str, ...] | str | None = None
|
||||||
ufp_value_fn: Callable[[T], Any] | None = None
|
ufp_value_fn: Callable[[T], Any] | None = None
|
||||||
ufp_enabled: str | None = None
|
ufp_enabled: tuple[str, ...] | str | None = None
|
||||||
ufp_perm: PermRequired | None = None
|
ufp_perm: PermRequired | None = None
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
"""Pre-convert strings to tuples for faster get_nested_attr."""
|
||||||
|
self.ufp_required_field = split_tuple(self.ufp_required_field)
|
||||||
|
self.ufp_value = split_tuple(self.ufp_value)
|
||||||
|
self.ufp_enabled = split_tuple(self.ufp_enabled)
|
||||||
|
|
||||||
def get_ufp_value(self, obj: T) -> Any:
|
def get_ufp_value(self, obj: T) -> Any:
|
||||||
"""Return value from UniFi Protect device."""
|
"""Return value from UniFi Protect device."""
|
||||||
if self.ufp_value is not None:
|
if (ufp_value := self.ufp_value) is not None:
|
||||||
return get_nested_attr(obj, self.ufp_value)
|
if TYPE_CHECKING:
|
||||||
if self.ufp_value_fn is not None:
|
# `ufp_value` is defined as a `str` in the dataclass, but
|
||||||
return self.ufp_value_fn(obj)
|
# `__post_init__` converts it to a `tuple[str, ...]` to avoid
|
||||||
|
# doing it at run time in `get_nested_attr` which is usually called
|
||||||
|
# millions of times per day. This tells mypy that it's a tuple.
|
||||||
|
assert isinstance(ufp_value, tuple)
|
||||||
|
return get_nested_attr(obj, ufp_value)
|
||||||
|
if (ufp_value_fn := self.ufp_value_fn) is not None:
|
||||||
|
return ufp_value_fn(obj)
|
||||||
|
|
||||||
# reminder for future that one is required
|
# reminder for future that one is required
|
||||||
raise RuntimeError( # pragma: no cover
|
raise RuntimeError( # pragma: no cover
|
||||||
@ -51,16 +76,27 @@ class ProtectRequiredKeysMixin(EntityDescription, Generic[T]):
|
|||||||
|
|
||||||
def get_ufp_enabled(self, obj: T) -> bool:
|
def get_ufp_enabled(self, obj: T) -> bool:
|
||||||
"""Return value from UniFi Protect device."""
|
"""Return value from UniFi Protect device."""
|
||||||
if self.ufp_enabled is not None:
|
if (ufp_enabled := self.ufp_enabled) is not None:
|
||||||
return bool(get_nested_attr(obj, self.ufp_enabled))
|
if TYPE_CHECKING:
|
||||||
|
# `ufp_enabled` is defined as a `str` in the dataclass, but
|
||||||
|
# `__post_init__` converts it to a `tuple[str, ...]` to avoid
|
||||||
|
# doing it at run time in `get_nested_attr` which is usually called
|
||||||
|
# millions of times per day. This tells mypy that it's a tuple.
|
||||||
|
assert isinstance(ufp_enabled, tuple)
|
||||||
|
return bool(get_nested_attr(obj, ufp_enabled))
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def has_required(self, obj: T) -> bool:
|
def has_required(self, obj: T) -> bool:
|
||||||
"""Return if has required field."""
|
"""Return if has required field."""
|
||||||
|
if (ufp_required_field := self.ufp_required_field) is None:
|
||||||
if self.ufp_required_field is None:
|
|
||||||
return True
|
return True
|
||||||
return bool(get_nested_attr(obj, self.ufp_required_field))
|
if TYPE_CHECKING:
|
||||||
|
# `ufp_required_field` is defined as a `str` in the dataclass, but
|
||||||
|
# `__post_init__` converts it to a `tuple[str, ...]` to avoid
|
||||||
|
# doing it at run time in `get_nested_attr` which is usually called
|
||||||
|
# millions of times per day. This tells mypy that it's a tuple.
|
||||||
|
assert isinstance(ufp_required_field, tuple)
|
||||||
|
return bool(get_nested_attr(obj, ufp_required_field))
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@ -73,7 +109,7 @@ class ProtectEventMixin(ProtectRequiredKeysMixin[T]):
|
|||||||
"""Return value from UniFi Protect device."""
|
"""Return value from UniFi Protect device."""
|
||||||
|
|
||||||
if self.ufp_event_obj is not None:
|
if self.ufp_event_obj is not None:
|
||||||
return cast(Event, get_nested_attr(obj, self.ufp_event_obj))
|
return cast(Event, getattr(obj, self.ufp_event_obj, None))
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_is_on(self, event: Event | None) -> bool:
|
def get_is_on(self, event: Event | None) -> bool:
|
||||||
|
@ -41,13 +41,13 @@ from .const import (
|
|||||||
_SENTINEL = object()
|
_SENTINEL = object()
|
||||||
|
|
||||||
|
|
||||||
def get_nested_attr(obj: Any, attr: str) -> Any:
|
def get_nested_attr(obj: Any, attrs: tuple[str, ...]) -> Any:
|
||||||
"""Fetch a nested attribute."""
|
"""Fetch a nested attribute."""
|
||||||
if "." not in attr:
|
if len(attrs) == 1:
|
||||||
value = getattr(obj, attr, None)
|
value = getattr(obj, attrs[0], None)
|
||||||
else:
|
else:
|
||||||
value = obj
|
value = obj
|
||||||
for key in attr.split("."):
|
for key in attrs:
|
||||||
if (value := getattr(value, key, _SENTINEL)) is _SENTINEL:
|
if (value := getattr(value, key, _SENTINEL)) is _SENTINEL:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user