diff --git a/homeassistant/components/tuya/binary_sensor.py b/homeassistant/components/tuya/binary_sensor.py index 3d6ad871f20..4e6fe260afc 100644 --- a/homeassistant/components/tuya/binary_sensor.py +++ b/homeassistant/components/tuya/binary_sensor.py @@ -15,11 +15,11 @@ from homeassistant.const import EntityCategory from homeassistant.core import HomeAssistant, callback from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback -from homeassistant.util.json import json_loads from . import TuyaConfigEntry -from .const import TUYA_DISCOVERY_NEW, DeviceCategory, DPCode, DPType +from .const import TUYA_DISCOVERY_NEW, DeviceCategory, DPCode from .entity import TuyaEntity +from .models import DPCodeBitmapBitWrapper, DPCodeBooleanWrapper, DPCodeWrapper @dataclass(frozen=True) @@ -366,20 +366,48 @@ BINARY_SENSORS: dict[DeviceCategory, tuple[TuyaBinarySensorEntityDescription, .. } -def _get_bitmap_bit_mask( - device: CustomerDevice, dpcode: str, bitmap_key: str | None -) -> int | None: - """Get the bit mask for a given bitmap description.""" - if ( - bitmap_key is None - or (status_range := device.status_range.get(dpcode)) is None - or status_range.type != DPType.BITMAP - or not isinstance(bitmap_values := json_loads(status_range.values), dict) - or not isinstance(bitmap_labels := bitmap_values.get("label"), list) - or bitmap_key not in bitmap_labels - ): +class _CustomDPCodeWrapper(DPCodeWrapper): + """Custom DPCode Wrapper to check for values in a set.""" + + _valid_values: set[bool | float | int | str] + + def __init__( + self, dpcode: str, valid_values: set[bool | float | int | str] + ) -> None: + """Init CustomDPCodeBooleanWrapper.""" + super().__init__(dpcode) + self._valid_values = valid_values + + def read_device_status(self, device: CustomerDevice) -> bool | None: + """Read the device value for the dpcode.""" + if (raw_value := self._read_device_status_raw(device)) is None: + return None + return raw_value in self._valid_values + + +def _get_dpcode_wrapper( + device: CustomerDevice, + description: TuyaBinarySensorEntityDescription, +) -> DPCodeWrapper | None: + """Get DPCode wrapper for an entity description.""" + dpcode = description.dpcode or description.key + if description.bitmap_key is not None: + return DPCodeBitmapBitWrapper.find_dpcode( + device, dpcode, bitmap_key=description.bitmap_key + ) + + if bool_type := DPCodeBooleanWrapper.find_dpcode(device, dpcode): + return bool_type + + # Legacy / compatibility + if dpcode not in device.status: return None - return bitmap_labels.index(bitmap_key) + return _CustomDPCodeWrapper( + dpcode, + description.on_value + if isinstance(description.on_value, set) + else {description.on_value}, + ) async def async_setup_entry( @@ -397,25 +425,11 @@ async def async_setup_entry( for device_id in device_ids: device = manager.device_map[device_id] if descriptions := BINARY_SENSORS.get(device.category): - for description in descriptions: - dpcode = description.dpcode or description.key - if dpcode in device.status: - mask = _get_bitmap_bit_mask( - device, dpcode, description.bitmap_key - ) - - if ( - description.bitmap_key is None # Regular binary sensor - or mask is not None # Bitmap sensor with valid mask - ): - entities.append( - TuyaBinarySensorEntity( - device, - manager, - description, - mask, - ) - ) + entities.extend( + TuyaBinarySensorEntity(device, manager, description, dpcode_wrapper) + for description in descriptions + if (dpcode_wrapper := _get_dpcode_wrapper(device, description)) + ) async_add_entities(entities) @@ -436,26 +450,15 @@ class TuyaBinarySensorEntity(TuyaEntity, BinarySensorEntity): device: CustomerDevice, device_manager: Manager, description: TuyaBinarySensorEntityDescription, - bit_mask: int | None = None, + dpcode_wrapper: DPCodeWrapper, ) -> None: """Init Tuya binary sensor.""" super().__init__(device, device_manager) self.entity_description = description self._attr_unique_id = f"{super().unique_id}{description.key}" - self._bit_mask = bit_mask + self._dpcode_wrapper = dpcode_wrapper @property - def is_on(self) -> bool: + def is_on(self) -> bool | None: """Return true if sensor is on.""" - dpcode = self.entity_description.dpcode or self.entity_description.key - if dpcode not in self.device.status: - return False - - if self._bit_mask is not None: - # For bitmap sensors, check the specific bit mask - return (self.device.status[dpcode] & (1 << self._bit_mask)) != 0 - - if isinstance(self.entity_description.on_value, set): - return self.device.status[dpcode] in self.entity_description.on_value - - return self.device.status[dpcode] == self.entity_description.on_value + return self._dpcode_wrapper.read_device_status(self.device) diff --git a/homeassistant/components/tuya/models.py b/homeassistant/components/tuya/models.py index 8f95449970a..f8e03efae90 100644 --- a/homeassistant/components/tuya/models.py +++ b/homeassistant/components/tuya/models.py @@ -101,6 +101,20 @@ class IntegerTypeData(TypeInformation): ) +@dataclass +class BitmapTypeInformation(TypeInformation): + """Bitmap type information.""" + + label: list[str] + + @classmethod + def from_json(cls, dpcode: DPCode, data: str) -> Self | None: + """Load JSON string and return a BitmapTypeInformation object.""" + if not (parsed := json.loads(data)): + return None + return cls(dpcode, **parsed) + + @dataclass class EnumTypeData(TypeInformation): """Enum Type Data.""" @@ -116,6 +130,7 @@ class EnumTypeData(TypeInformation): _TYPE_INFORMATION_MAPPINGS: dict[DPType, type[TypeInformation]] = { + DPType.BITMAP: BitmapTypeInformation, DPType.BOOLEAN: TypeInformation, DPType.ENUM: EnumTypeData, DPType.INTEGER: IntegerTypeData, @@ -147,13 +162,13 @@ class DPCodeWrapper(ABC): The raw device status is converted to a Home Assistant value. """ - @abstractmethod def _convert_value_to_raw_value(self, device: CustomerDevice, value: Any) -> Any: """Convert a Home Assistant value back to a raw device value. This is called by `get_update_command` to prepare the value for sending - back to the device, and should be implemented in concrete classes. + back to the device, and should be implemented in concrete classes if needed. """ + raise NotImplementedError def get_update_command(self, device: CustomerDevice, value: Any) -> dict[str, Any]: """Get the update command for the dpcode. @@ -275,6 +290,48 @@ class DPCodeIntegerWrapper(DPCodeTypeInformationWrapper[IntegerTypeData]): ) +class DPCodeBitmapBitWrapper(DPCodeWrapper): + """Simple wrapper for a specific bit in bitmap values.""" + + def __init__(self, dpcode: str, mask: int) -> None: + """Init DPCodeBitmapWrapper.""" + super().__init__(dpcode) + self._mask = mask + + def read_device_status(self, device: CustomerDevice) -> bool | None: + """Read the device value for the dpcode.""" + if (raw_value := self._read_device_status_raw(device)) is None: + return None + return (raw_value & (1 << self._mask)) != 0 + + @classmethod + def find_dpcode( + cls, + device: CustomerDevice, + dpcodes: str | DPCode | tuple[DPCode, ...], + *, + bitmap_key: str, + ) -> Self | None: + """Find and return a DPCodeBitmapBitWrapper for the given DP codes.""" + if ( + type_information := find_dpcode(device, dpcodes, dptype=DPType.BITMAP) + ) and bitmap_key in type_information.label: + return cls( + type_information.dpcode, type_information.label.index(bitmap_key) + ) + return None + + +@overload +def find_dpcode( + device: CustomerDevice, + dpcodes: str | DPCode | tuple[DPCode, ...] | None, + *, + prefer_function: bool = False, + dptype: Literal[DPType.BITMAP], +) -> BitmapTypeInformation | None: ... + + @overload def find_dpcode( device: CustomerDevice,