Change more properties to attributes for rfxtrx (#74880)

* Move more properties to attributes

* Apply suggestions from code review

Co-authored-by: epenet <6771947+epenet@users.noreply.github.com>

* Convert states to attributes

* Adjustments from review

Co-authored-by: epenet <6771947+epenet@users.noreply.github.com>
This commit is contained in:
Joakim Plate 2022-07-11 16:21:26 +02:00 committed by GitHub
parent c1a4dc2f22
commit ee4749b207
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 76 additions and 117 deletions

View File

@ -440,6 +440,14 @@ def get_device_tuple_from_identifiers(
return DeviceTuple(identifier2[1], identifier2[2], identifier2[3])
def get_identifiers_from_device_tuple(
device_tuple: DeviceTuple,
) -> set[tuple[str, str]]:
"""Calculate the device identifier from a device tuple."""
# work around legacy identifier, being a multi tuple value
return {(DOMAIN, *device_tuple)} # type: ignore[arg-type]
async def async_remove_config_entry_device(
hass: HomeAssistant, config_entry: ConfigEntry, device_entry: dr.DeviceEntry
) -> bool:
@ -469,10 +477,15 @@ class RfxtrxEntity(RestoreEntity):
event: rfxtrxmod.RFXtrxEvent | None = None,
) -> None:
"""Initialize the device."""
self._attr_device_info = DeviceInfo(
identifiers=get_identifiers_from_device_tuple(device_id),
model=device.type_string,
name=f"{device.type_string} {device.id_string}",
)
self._attr_unique_id = "_".join(x for x in device_id)
self._device = device
self._event = event
self._device_id = device_id
self._unique_id = "_".join(x for x in self._device_id)
# If id_string is 213c7f2:1, the group_id is 213c7f2, and the device will respond to
# group events regardless of their group indices.
(self._group_id, _, _) = device.id_string.partition(":")
@ -493,20 +506,6 @@ class RfxtrxEntity(RestoreEntity):
return None
return {ATTR_EVENT: "".join(f"{x:02x}" for x in self._event.data)}
@property
def unique_id(self):
"""Return unique identifier of remote device."""
return self._unique_id
@property
def device_info(self):
"""Return the device info."""
return DeviceInfo(
identifiers={(DOMAIN, *self._device_id)},
model=self._device.type_string,
name=f"{self._device.type_string} {self._device.id_string}",
)
def _event_applies(self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple):
"""Check if event applies to me."""
if isinstance(event, rfxtrxmod.ControlEvent):
@ -546,7 +545,6 @@ class RfxtrxCommandEntity(RfxtrxEntity):
) -> None:
"""Initialzie a switch or light device."""
super().__init__(device, device_id, event=event)
self._state: bool | None = None
async def _async_send(self, fun, *args):
rfx_object = self.hass.data[DOMAIN][DATA_RFXOBJECT]

View File

@ -124,6 +124,9 @@ async def async_setup_entry(
class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
"""A representation of a RFXtrx binary sensor."""
_attr_force_update = True
"""We should force updates. Repeated states have meaning."""
def __init__(
self,
device: rfxtrxmod.RFXtrxDevice,
@ -140,7 +143,6 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
self.entity_description = entity_description
self._data_bits = data_bits
self._off_delay = off_delay
self._state: bool | None = None
self._delay_listener: CALLBACK_TYPE | None = None
self._cmd_on = cmd_on
self._cmd_off = cmd_off
@ -152,20 +154,10 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
if self._event is None:
old_state = await self.async_get_last_state()
if old_state is not None:
self._state = old_state.state == STATE_ON
self._attr_is_on = old_state.state == STATE_ON
if self._state and self._off_delay is not None:
self._state = False
@property
def force_update(self) -> bool:
"""We should force updates. Repeated states have meaning."""
return True
@property
def is_on(self):
"""Return true if the sensor state is True."""
return self._state
if self.is_on and self._off_delay is not None:
self._attr_is_on = False
def _apply_event_lighting4(self, event: rfxtrxmod.RFXtrxEvent):
"""Apply event for a lighting 4 device."""
@ -174,22 +166,22 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
assert cmdstr
cmd = int(cmdstr, 16)
if cmd == self._cmd_on:
self._state = True
self._attr_is_on = True
elif cmd == self._cmd_off:
self._state = False
self._attr_is_on = False
else:
self._state = True
self._attr_is_on = True
def _apply_event_standard(self, event: rfxtrxmod.RFXtrxEvent):
assert isinstance(event, (rfxtrxmod.SensorEvent, rfxtrxmod.ControlEvent))
if event.values.get("Command") in COMMAND_ON_LIST:
self._state = True
self._attr_is_on = True
elif event.values.get("Command") in COMMAND_OFF_LIST:
self._state = False
self._attr_is_on = False
elif event.values.get("Sensor Status") in SENSOR_STATUS_ON:
self._state = True
self._attr_is_on = True
elif event.values.get("Sensor Status") in SENSOR_STATUS_OFF:
self._state = False
self._attr_is_on = False
def _apply_event(self, event: rfxtrxmod.RFXtrxEvent):
"""Apply command from rfxtrx."""
@ -226,7 +218,7 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
def off_delay_listener(now):
"""Switch device off after a delay."""
self._delay_listener = None
self._state = False
self._attr_is_on = False
self.async_write_ha_state()
self._delay_listener = evt.async_call_later(

View File

@ -71,6 +71,21 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
"""Initialize the RFXtrx cover device."""
super().__init__(device, device_id, event)
self._venetian_blind_mode = venetian_blind_mode
self._attr_is_closed: bool | None = True
self._attr_supported_features = (
CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE | CoverEntityFeature.STOP
)
if venetian_blind_mode in (
CONST_VENETIAN_BLIND_MODE_US,
CONST_VENETIAN_BLIND_MODE_EU,
):
self._attr_supported_features |= (
CoverEntityFeature.OPEN_TILT
| CoverEntityFeature.CLOSE_TILT
| CoverEntityFeature.STOP_TILT
)
async def async_added_to_hass(self) -> None:
"""Restore device state."""
@ -79,31 +94,7 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
if self._event is None:
old_state = await self.async_get_last_state()
if old_state is not None:
self._state = old_state.state == STATE_OPEN
@property
def supported_features(self) -> int:
"""Flag supported features."""
supported_features = (
CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE | CoverEntityFeature.STOP
)
if self._venetian_blind_mode in (
CONST_VENETIAN_BLIND_MODE_US,
CONST_VENETIAN_BLIND_MODE_EU,
):
supported_features |= (
CoverEntityFeature.OPEN_TILT
| CoverEntityFeature.CLOSE_TILT
| CoverEntityFeature.STOP_TILT
)
return supported_features
@property
def is_closed(self) -> bool:
"""Return if the cover is closed."""
return not self._state
self._attr_is_closed = old_state.state != STATE_OPEN
async def async_open_cover(self, **kwargs: Any) -> None:
"""Move the cover up."""
@ -113,7 +104,7 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
await self._async_send(self._device.send_up2sec)
else:
await self._async_send(self._device.send_open)
self._state = True
self._attr_is_closed = False
self.async_write_ha_state()
async def async_close_cover(self, **kwargs: Any) -> None:
@ -124,13 +115,13 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
await self._async_send(self._device.send_down2sec)
else:
await self._async_send(self._device.send_close)
self._state = False
self._attr_is_closed = True
self.async_write_ha_state()
async def async_stop_cover(self, **kwargs: Any) -> None:
"""Stop the cover."""
await self._async_send(self._device.send_stop)
self._state = True
self._attr_is_closed = False
self.async_write_ha_state()
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
@ -150,7 +141,7 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
"""Stop the cover tilt."""
await self._async_send(self._device.send_stop)
self._state = True
self._attr_is_closed = False
self.async_write_ha_state()
def _apply_event(self, event: rfxtrxmod.RFXtrxEvent):
@ -158,9 +149,9 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
assert isinstance(event, rfxtrxmod.ControlEvent)
super()._apply_event(event)
if event.values["Command"] in COMMAND_ON_LIST:
self._state = True
self._attr_is_closed = False
elif event.values["Command"] in COMMAND_OFF_LIST:
self._state = False
self._attr_is_closed = True
@callback
def _handle_event(self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple):

View File

@ -56,7 +56,7 @@ class RfxtrxLight(RfxtrxCommandEntity, LightEntity):
_attr_color_mode = ColorMode.BRIGHTNESS
_attr_supported_color_modes = {ColorMode.BRIGHTNESS}
_brightness = 0
_attr_brightness: int = 0
_device: rfxtrxmod.LightingDevice
async def async_added_to_hass(self):
@ -66,37 +66,28 @@ class RfxtrxLight(RfxtrxCommandEntity, LightEntity):
if self._event is None:
old_state = await self.async_get_last_state()
if old_state is not None:
self._state = old_state.state == STATE_ON
self._brightness = old_state.attributes.get(ATTR_BRIGHTNESS)
@property
def brightness(self):
"""Return the brightness of this light between 0..255."""
return self._brightness
@property
def is_on(self):
"""Return true if device is on."""
return self._state
self._attr_is_on = old_state.state == STATE_ON
if brightness := old_state.attributes.get(ATTR_BRIGHTNESS):
self._attr_brightness = int(brightness)
async def async_turn_on(self, **kwargs):
"""Turn the device on."""
brightness = kwargs.get(ATTR_BRIGHTNESS)
self._state = True
self._attr_is_on = True
if brightness is None:
await self._async_send(self._device.send_on)
self._brightness = 255
self._attr_brightness = 255
else:
await self._async_send(self._device.send_dim, brightness * 100 // 255)
self._brightness = brightness
self._attr_brightness = brightness
self.async_write_ha_state()
async def async_turn_off(self, **kwargs):
"""Turn the device off."""
await self._async_send(self._device.send_off)
self._state = False
self._brightness = 0
self._attr_is_on = False
self._attr_brightness = 0
self.async_write_ha_state()
def _apply_event(self, event: rfxtrxmod.RFXtrxEvent):
@ -104,12 +95,13 @@ class RfxtrxLight(RfxtrxCommandEntity, LightEntity):
assert isinstance(event, rfxtrxmod.ControlEvent)
super()._apply_event(event)
if event.values["Command"] in COMMAND_ON_LIST:
self._state = True
self._attr_is_on = True
elif event.values["Command"] in COMMAND_OFF_LIST:
self._state = False
self._attr_is_on = False
elif event.values["Command"] == "Set level":
self._brightness = event.values["Dim level"] * 255 // 100
self._state = self._brightness > 0
brightness = event.values["Dim level"] * 255 // 100
self._attr_brightness = brightness
self._attr_is_on = brightness > 0
@callback
def _handle_event(self, event, device_id):

View File

@ -273,15 +273,16 @@ async def async_setup_entry(
class RfxtrxSensor(RfxtrxEntity, SensorEntity):
"""Representation of a RFXtrx sensor."""
_attr_force_update = True
"""We should force updates. Repeated states have meaning."""
entity_description: RfxtrxSensorEntityDescription
def __init__(self, device, device_id, entity_description, event=None):
"""Initialize the sensor."""
super().__init__(device, device_id, event=event)
self.entity_description = entity_description
self._unique_id = "_".join(
x for x in (*self._device_id, entity_description.key)
)
self._attr_unique_id = "_".join(x for x in (*device_id, entity_description.key))
async def async_added_to_hass(self):
"""Restore device state."""
@ -302,16 +303,6 @@ class RfxtrxSensor(RfxtrxEntity, SensorEntity):
value = self._event.values.get(self.entity_description.key)
return self.entity_description.convert(value)
@property
def should_poll(self):
"""No polling needed."""
return False
@property
def force_update(self) -> bool:
"""We should force updates. Repeated states have meaning."""
return True
@callback
def _handle_event(self, event, device_id):
"""Check if event applies to me and update."""

View File

@ -94,7 +94,7 @@ class RfxtrxSwitch(RfxtrxCommandEntity, SwitchEntity):
if self._event is None:
old_state = await self.async_get_last_state()
if old_state is not None:
self._state = old_state.state == STATE_ON
self._attr_is_on = old_state.state == STATE_ON
def _apply_event_lighting4(self, event: rfxtrxmod.RFXtrxEvent):
"""Apply event for a lighting 4 device."""
@ -103,18 +103,18 @@ class RfxtrxSwitch(RfxtrxCommandEntity, SwitchEntity):
assert cmdstr
cmd = int(cmdstr, 16)
if cmd == self._cmd_on:
self._state = True
self._attr_is_on = True
elif cmd == self._cmd_off:
self._state = False
self._attr_is_on = False
else:
self._state = True
self._attr_is_on = True
def _apply_event_standard(self, event: rfxtrxmod.RFXtrxEvent) -> None:
assert isinstance(event, rfxtrxmod.ControlEvent)
if event.values["Command"] in COMMAND_ON_LIST:
self._state = True
self._attr_is_on = True
elif event.values["Command"] in COMMAND_OFF_LIST:
self._state = False
self._attr_is_on = False
def _apply_event(self, event: rfxtrxmod.RFXtrxEvent) -> None:
"""Apply command from rfxtrx."""
@ -134,18 +134,13 @@ class RfxtrxSwitch(RfxtrxCommandEntity, SwitchEntity):
self.async_write_ha_state()
@property
def is_on(self):
"""Return true if device is on."""
return self._state
async def async_turn_on(self, **kwargs):
"""Turn the device on."""
if self._cmd_on is not None:
await self._async_send(self._device.send_command, self._cmd_on)
else:
await self._async_send(self._device.send_on)
self._state = True
self._attr_is_on = True
self.async_write_ha_state()
async def async_turn_off(self, **kwargs):
@ -154,5 +149,5 @@ class RfxtrxSwitch(RfxtrxCommandEntity, SwitchEntity):
await self._async_send(self._device.send_command, self._cmd_off)
else:
await self._async_send(self._device.send_off)
self._state = False
self._attr_is_on = False
self.async_write_ha_state()