mirror of
https://github.com/home-assistant/core.git
synced 2025-11-15 22:10:09 +00:00
435 lines
13 KiB
Python
435 lines
13 KiB
Python
"""Tuya Home Assistant Base Device Model."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from abc import ABC, abstractmethod
|
|
import base64
|
|
from dataclasses import dataclass
|
|
import json
|
|
import struct
|
|
from typing import Any, Literal, Self, overload
|
|
|
|
from tuya_sharing import CustomerDevice
|
|
|
|
from .const import DPCode, DPType
|
|
from .util import remap_value
|
|
|
|
|
|
@dataclass
|
|
class TypeInformation:
|
|
"""Type information.
|
|
|
|
As provided by the SDK, from `device.function` / `device.status_range`.
|
|
"""
|
|
|
|
dpcode: DPCode
|
|
|
|
@classmethod
|
|
def from_json(cls, dpcode: DPCode, data: str) -> Self | None:
|
|
"""Load JSON string and return a TypeInformation object."""
|
|
return cls(dpcode)
|
|
|
|
|
|
@dataclass
|
|
class IntegerTypeData(TypeInformation):
|
|
"""Integer Type Data."""
|
|
|
|
min: int
|
|
max: int
|
|
scale: float
|
|
step: float
|
|
unit: str | None = None
|
|
type: str | None = None
|
|
|
|
@property
|
|
def max_scaled(self) -> float:
|
|
"""Return the max scaled."""
|
|
return self.scale_value(self.max)
|
|
|
|
@property
|
|
def min_scaled(self) -> float:
|
|
"""Return the min scaled."""
|
|
return self.scale_value(self.min)
|
|
|
|
@property
|
|
def step_scaled(self) -> float:
|
|
"""Return the step scaled."""
|
|
return self.step / (10**self.scale)
|
|
|
|
def scale_value(self, value: float) -> float:
|
|
"""Scale a value."""
|
|
return value / (10**self.scale)
|
|
|
|
def scale_value_back(self, value: float) -> int:
|
|
"""Return raw value for scaled."""
|
|
return int(value * (10**self.scale))
|
|
|
|
def remap_value_to(
|
|
self,
|
|
value: float,
|
|
to_min: float = 0,
|
|
to_max: float = 255,
|
|
reverse: bool = False,
|
|
) -> float:
|
|
"""Remap a value from this range to a new range."""
|
|
return remap_value(value, self.min, self.max, to_min, to_max, reverse)
|
|
|
|
def remap_value_from(
|
|
self,
|
|
value: float,
|
|
from_min: float = 0,
|
|
from_max: float = 255,
|
|
reverse: bool = False,
|
|
) -> float:
|
|
"""Remap a value from its current range to this range."""
|
|
return remap_value(value, from_min, from_max, self.min, self.max, reverse)
|
|
|
|
@classmethod
|
|
def from_json(cls, dpcode: DPCode, data: str) -> Self | None:
|
|
"""Load JSON string and return a IntegerTypeData object."""
|
|
if not (parsed := json.loads(data)):
|
|
return None
|
|
|
|
return cls(
|
|
dpcode,
|
|
min=int(parsed["min"]),
|
|
max=int(parsed["max"]),
|
|
scale=float(parsed["scale"]),
|
|
step=max(float(parsed["step"]), 1),
|
|
unit=parsed.get("unit"),
|
|
type=parsed.get("type"),
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class BitmapTypeInformation(TypeInformation):
|
|
"""Bitmap type information."""
|
|
|
|
label: list[str]
|
|
|
|
@classmethod
|
|
def from_json(cls, dpcode: DPCode, data: str) -> Self | None:
|
|
"""Load JSON string and return a BitmapTypeInformation object."""
|
|
if not (parsed := json.loads(data)):
|
|
return None
|
|
return cls(dpcode, **parsed)
|
|
|
|
|
|
@dataclass
|
|
class EnumTypeData(TypeInformation):
|
|
"""Enum Type Data."""
|
|
|
|
range: list[str]
|
|
|
|
@classmethod
|
|
def from_json(cls, dpcode: DPCode, data: str) -> Self | None:
|
|
"""Load JSON string and return a EnumTypeData object."""
|
|
if not (parsed := json.loads(data)):
|
|
return None
|
|
return cls(dpcode, **parsed)
|
|
|
|
|
|
_TYPE_INFORMATION_MAPPINGS: dict[DPType, type[TypeInformation]] = {
|
|
DPType.BITMAP: BitmapTypeInformation,
|
|
DPType.BOOLEAN: TypeInformation,
|
|
DPType.ENUM: EnumTypeData,
|
|
DPType.INTEGER: IntegerTypeData,
|
|
}
|
|
|
|
|
|
class DPCodeWrapper(ABC):
|
|
"""Base DPCode wrapper.
|
|
|
|
Used as a common interface for referring to a DPCode, and
|
|
access read conversion routines.
|
|
"""
|
|
|
|
def __init__(self, dpcode: str) -> None:
|
|
"""Init DPCodeWrapper."""
|
|
self.dpcode = dpcode
|
|
|
|
def _read_device_status_raw(self, device: CustomerDevice) -> Any | None:
|
|
"""Read the raw device status for the DPCode.
|
|
|
|
Private helper method for `read_device_status`.
|
|
"""
|
|
return device.status.get(self.dpcode)
|
|
|
|
@abstractmethod
|
|
def read_device_status(self, device: CustomerDevice) -> Any | None:
|
|
"""Read the device value for the dpcode.
|
|
|
|
The raw device status is converted to a Home Assistant value.
|
|
"""
|
|
|
|
def _convert_value_to_raw_value(self, device: CustomerDevice, value: Any) -> Any:
|
|
"""Convert a Home Assistant value back to a raw device value.
|
|
|
|
This is called by `get_update_command` to prepare the value for sending
|
|
back to the device, and should be implemented in concrete classes if needed.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def get_update_command(self, device: CustomerDevice, value: Any) -> dict[str, Any]:
|
|
"""Get the update command for the dpcode.
|
|
|
|
The Home Assistant value is converted back to a raw device value.
|
|
"""
|
|
return {
|
|
"code": self.dpcode,
|
|
"value": self._convert_value_to_raw_value(device, value),
|
|
}
|
|
|
|
|
|
class DPCodeTypeInformationWrapper[T: TypeInformation](DPCodeWrapper):
|
|
"""Base DPCode wrapper with Type Information."""
|
|
|
|
DPTYPE: DPType
|
|
type_information: T
|
|
|
|
def __init__(self, dpcode: str, type_information: T) -> None:
|
|
"""Init DPCodeWrapper."""
|
|
super().__init__(dpcode)
|
|
self.type_information = type_information
|
|
|
|
@classmethod
|
|
def find_dpcode(
|
|
cls,
|
|
device: CustomerDevice,
|
|
dpcodes: str | DPCode | tuple[DPCode, ...],
|
|
*,
|
|
prefer_function: bool = False,
|
|
) -> Self | None:
|
|
"""Find and return a DPCodeTypeInformationWrapper for the given DP codes."""
|
|
if type_information := find_dpcode( # type: ignore[call-overload]
|
|
device, dpcodes, dptype=cls.DPTYPE, prefer_function=prefer_function
|
|
):
|
|
return cls(
|
|
dpcode=type_information.dpcode, type_information=type_information
|
|
)
|
|
return None
|
|
|
|
|
|
class DPCodeBooleanWrapper(DPCodeTypeInformationWrapper[TypeInformation]):
|
|
"""Simple wrapper for boolean values.
|
|
|
|
Supports True/False only.
|
|
"""
|
|
|
|
DPTYPE = DPType.BOOLEAN
|
|
|
|
def read_device_status(self, device: CustomerDevice) -> bool | None:
|
|
"""Read the device value for the dpcode."""
|
|
if (raw_value := self._read_device_status_raw(device)) in (True, False):
|
|
return raw_value
|
|
return None
|
|
|
|
def _convert_value_to_raw_value(
|
|
self, device: CustomerDevice, value: Any
|
|
) -> Any | None:
|
|
"""Convert a Home Assistant value back to a raw device value."""
|
|
if value in (True, False):
|
|
return value
|
|
# Currently only called with boolean values
|
|
# Safety net in case of future changes
|
|
raise ValueError(f"Invalid boolean value `{value}`")
|
|
|
|
|
|
class DPCodeEnumWrapper(DPCodeTypeInformationWrapper[EnumTypeData]):
|
|
"""Simple wrapper for EnumTypeData values."""
|
|
|
|
DPTYPE = DPType.ENUM
|
|
|
|
def read_device_status(self, device: CustomerDevice) -> str | None:
|
|
"""Read the device value for the dpcode.
|
|
|
|
Values outside of the list defined by the Enum type information will
|
|
return None.
|
|
"""
|
|
if (
|
|
raw_value := self._read_device_status_raw(device)
|
|
) in self.type_information.range:
|
|
return raw_value
|
|
return None
|
|
|
|
def _convert_value_to_raw_value(self, device: CustomerDevice, value: Any) -> Any:
|
|
"""Convert a Home Assistant value back to a raw device value."""
|
|
if value in self.type_information.range:
|
|
return value
|
|
# Guarded by select option validation
|
|
# Safety net in case of future changes
|
|
raise ValueError(
|
|
f"Enum value `{value}` out of range: {self.type_information.range}"
|
|
)
|
|
|
|
|
|
class DPCodeIntegerWrapper(DPCodeTypeInformationWrapper[IntegerTypeData]):
|
|
"""Simple wrapper for IntegerTypeData values."""
|
|
|
|
DPTYPE = DPType.INTEGER
|
|
|
|
def read_device_status(self, device: CustomerDevice) -> float | None:
|
|
"""Read the device value for the dpcode.
|
|
|
|
Value will be scaled based on the Integer type information.
|
|
"""
|
|
if (raw_value := self._read_device_status_raw(device)) is None:
|
|
return None
|
|
return raw_value / (10**self.type_information.scale)
|
|
|
|
def _convert_value_to_raw_value(self, device: CustomerDevice, value: Any) -> Any:
|
|
"""Convert a Home Assistant value back to a raw device value."""
|
|
new_value = round(value * (10**self.type_information.scale))
|
|
if self.type_information.min <= new_value <= self.type_information.max:
|
|
return new_value
|
|
# Guarded by number validation
|
|
# Safety net in case of future changes
|
|
raise ValueError(
|
|
f"Value `{new_value}` (converted from `{value}`) out of range:"
|
|
f" ({self.type_information.min}-{self.type_information.max})"
|
|
)
|
|
|
|
|
|
class DPCodeBitmapBitWrapper(DPCodeWrapper):
|
|
"""Simple wrapper for a specific bit in bitmap values."""
|
|
|
|
def __init__(self, dpcode: str, mask: int) -> None:
|
|
"""Init DPCodeBitmapWrapper."""
|
|
super().__init__(dpcode)
|
|
self._mask = mask
|
|
|
|
def read_device_status(self, device: CustomerDevice) -> bool | None:
|
|
"""Read the device value for the dpcode."""
|
|
if (raw_value := self._read_device_status_raw(device)) is None:
|
|
return None
|
|
return (raw_value & (1 << self._mask)) != 0
|
|
|
|
@classmethod
|
|
def find_dpcode(
|
|
cls,
|
|
device: CustomerDevice,
|
|
dpcodes: str | DPCode | tuple[DPCode, ...],
|
|
*,
|
|
bitmap_key: str,
|
|
) -> Self | None:
|
|
"""Find and return a DPCodeBitmapBitWrapper for the given DP codes."""
|
|
if (
|
|
type_information := find_dpcode(device, dpcodes, dptype=DPType.BITMAP)
|
|
) and bitmap_key in type_information.label:
|
|
return cls(
|
|
type_information.dpcode, type_information.label.index(bitmap_key)
|
|
)
|
|
return None
|
|
|
|
|
|
@overload
|
|
def find_dpcode(
|
|
device: CustomerDevice,
|
|
dpcodes: str | DPCode | tuple[DPCode, ...] | None,
|
|
*,
|
|
prefer_function: bool = False,
|
|
dptype: Literal[DPType.BITMAP],
|
|
) -> BitmapTypeInformation | None: ...
|
|
|
|
|
|
@overload
|
|
def find_dpcode(
|
|
device: CustomerDevice,
|
|
dpcodes: str | DPCode | tuple[DPCode, ...] | None,
|
|
*,
|
|
prefer_function: bool = False,
|
|
dptype: Literal[DPType.ENUM],
|
|
) -> EnumTypeData | None: ...
|
|
|
|
|
|
@overload
|
|
def find_dpcode(
|
|
device: CustomerDevice,
|
|
dpcodes: str | DPCode | tuple[DPCode, ...] | None,
|
|
*,
|
|
prefer_function: bool = False,
|
|
dptype: Literal[DPType.INTEGER],
|
|
) -> IntegerTypeData | None: ...
|
|
|
|
|
|
def find_dpcode(
|
|
device: CustomerDevice,
|
|
dpcodes: str | DPCode | tuple[DPCode, ...] | None,
|
|
*,
|
|
prefer_function: bool = False,
|
|
dptype: DPType,
|
|
) -> TypeInformation | None:
|
|
"""Find type information for a matching DP code available for this device."""
|
|
if not (type_information_cls := _TYPE_INFORMATION_MAPPINGS.get(dptype)):
|
|
raise NotImplementedError(f"find_dpcode not supported for {dptype}")
|
|
|
|
if dpcodes is None:
|
|
return None
|
|
|
|
if isinstance(dpcodes, str):
|
|
dpcodes = (DPCode(dpcodes),)
|
|
elif not isinstance(dpcodes, tuple):
|
|
dpcodes = (dpcodes,)
|
|
|
|
lookup_tuple = (
|
|
(device.function, device.status_range)
|
|
if prefer_function
|
|
else (device.status_range, device.function)
|
|
)
|
|
|
|
for dpcode in dpcodes:
|
|
for device_specs in lookup_tuple:
|
|
if (
|
|
(current_definition := device_specs.get(dpcode))
|
|
and current_definition.type == dptype
|
|
and (
|
|
type_information := type_information_cls.from_json(
|
|
dpcode, current_definition.values
|
|
)
|
|
)
|
|
):
|
|
return type_information
|
|
|
|
return None
|
|
|
|
|
|
class ComplexValue:
|
|
"""Complex value (for JSON/RAW parsing)."""
|
|
|
|
@classmethod
|
|
def from_json(cls, data: str) -> Self:
|
|
"""Load JSON string and return a ComplexValue object."""
|
|
raise NotImplementedError("from_json is not implemented for this type")
|
|
|
|
@classmethod
|
|
def from_raw(cls, data: str) -> Self | None:
|
|
"""Decode base64 string and return a ComplexValue object."""
|
|
raise NotImplementedError("from_raw is not implemented for this type")
|
|
|
|
|
|
@dataclass
|
|
class ElectricityValue(ComplexValue):
|
|
"""Electricity complex value."""
|
|
|
|
electriccurrent: str | None = None
|
|
power: str | None = None
|
|
voltage: str | None = None
|
|
|
|
@classmethod
|
|
def from_json(cls, data: str) -> Self:
|
|
"""Load JSON string and return a ElectricityValue object."""
|
|
return cls(**json.loads(data.lower()))
|
|
|
|
@classmethod
|
|
def from_raw(cls, data: str) -> Self | None:
|
|
"""Decode base64 string and return a ElectricityValue object."""
|
|
raw = base64.b64decode(data)
|
|
if len(raw) == 0:
|
|
return None
|
|
voltage = struct.unpack(">H", raw[0:2])[0] / 10.0
|
|
electriccurrent = struct.unpack(">L", b"\x00" + raw[2:5])[0] / 1000.0
|
|
power = struct.unpack(">L", b"\x00" + raw[5:8])[0] / 1000.0
|
|
return cls(
|
|
electriccurrent=str(electriccurrent), power=str(power), voltage=str(voltage)
|
|
)
|