Enable strict typing for rfxtrx (#74927)

* Additional typing

* Enable strict typing

* Avoid changes causing coverage change

* Adjust comment on force update

* Rename to replace_devices

* Reduce typing scope

* Adjust mypy
This commit is contained in:
Joakim Plate 2022-09-23 16:47:58 +02:00 committed by GitHub
parent 67779089cd
commit ace9592aa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 190 additions and 101 deletions

View File

@ -213,6 +213,7 @@ homeassistant.components.recorder.*
homeassistant.components.remote.* homeassistant.components.remote.*
homeassistant.components.renault.* homeassistant.components.renault.*
homeassistant.components.repairs.* homeassistant.components.repairs.*
homeassistant.components.rfxtrx.*
homeassistant.components.rhasspy.* homeassistant.components.rhasspy.*
homeassistant.components.ridwell.* homeassistant.components.ridwell.*
homeassistant.components.rituals_perfume_genie.* homeassistant.components.rituals_perfume_genie.*

View File

@ -6,7 +6,7 @@ import binascii
from collections.abc import Callable from collections.abc import Callable
import copy import copy
import logging import logging
from typing import NamedTuple, cast from typing import Any, NamedTuple, cast
import RFXtrx as rfxtrxmod import RFXtrx as rfxtrxmod
import async_timeout import async_timeout
@ -61,7 +61,7 @@ class DeviceTuple(NamedTuple):
id_string: str id_string: str
def _bytearray_string(data): def _bytearray_string(data: Any) -> bytearray:
val = cv.string(data) val = cv.string(data)
try: try:
return bytearray.fromhex(val) return bytearray.fromhex(val)
@ -116,7 +116,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return True return True
def _create_rfx(config): def _create_rfx(config: dict[str, Any]) -> rfxtrxmod.Connect:
"""Construct a rfx object based on config.""" """Construct a rfx object based on config."""
modes = config.get(CONF_PROTOCOLS) modes = config.get(CONF_PROTOCOLS)
@ -144,7 +144,9 @@ def _create_rfx(config):
return rfx return rfx
def _get_device_lookup(devices): def _get_device_lookup(
devices: dict[str, dict[str, Any]]
) -> dict[DeviceTuple, dict[str, Any]]:
"""Get a lookup structure for devices.""" """Get a lookup structure for devices."""
lookup = {} lookup = {}
for event_code, event_config in devices.items(): for event_code, event_config in devices.items():
@ -157,7 +159,7 @@ def _get_device_lookup(devices):
return lookup return lookup
async def async_setup_internal(hass, entry: ConfigEntry): async def async_setup_internal(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Set up the RFXtrx component.""" """Set up the RFXtrx component."""
config = entry.data config = entry.data
@ -173,7 +175,7 @@ async def async_setup_internal(hass, entry: ConfigEntry):
# Declare the Handle event # Declare the Handle event
@callback @callback
def async_handle_receive(event): def async_handle_receive(event: rfxtrxmod.RFXtrxEvent) -> None:
"""Handle received messages from RFXtrx gateway.""" """Handle received messages from RFXtrx gateway."""
# Log RFXCOM event # Log RFXCOM event
if not event.device.id_string: if not event.device.id_string:
@ -204,7 +206,7 @@ async def async_setup_internal(hass, entry: ConfigEntry):
pt2262_devices.append(event.device.id_string) pt2262_devices.append(event.device.id_string)
device_entry = device_registry.async_get_device( device_entry = device_registry.async_get_device(
identifiers={(DOMAIN, *device_id)}, identifiers={(DOMAIN, *device_id)}, # type: ignore[arg-type]
) )
if device_entry: if device_entry:
event_data[ATTR_DEVICE_ID] = device_entry.id event_data[ATTR_DEVICE_ID] = device_entry.id
@ -216,7 +218,7 @@ async def async_setup_internal(hass, entry: ConfigEntry):
hass.bus.async_fire(EVENT_RFXTRX_EVENT, event_data) hass.bus.async_fire(EVENT_RFXTRX_EVENT, event_data)
@callback @callback
def _add_device(event, device_id): def _add_device(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple) -> None:
"""Add a device to config entry.""" """Add a device to config entry."""
config = {} config = {}
config[CONF_DEVICE_ID] = device_id config[CONF_DEVICE_ID] = device_id
@ -237,7 +239,7 @@ async def async_setup_internal(hass, entry: ConfigEntry):
devices[device_id] = config devices[device_id] = config
@callback @callback
def _remove_device(device_id: DeviceTuple): def _remove_device(device_id: DeviceTuple) -> None:
data = { data = {
**entry.data, **entry.data,
CONF_DEVICES: { CONF_DEVICES: {
@ -250,7 +252,7 @@ async def async_setup_internal(hass, entry: ConfigEntry):
devices.pop(device_id) devices.pop(device_id)
@callback @callback
def _updated_device(event: Event): def _updated_device(event: Event) -> None:
if event.data["action"] != "remove": if event.data["action"] != "remove":
return return
device_entry = device_registry.deleted_devices[event.data["device_id"]] device_entry = device_registry.deleted_devices[event.data["device_id"]]
@ -264,7 +266,7 @@ async def async_setup_internal(hass, entry: ConfigEntry):
hass.bus.async_listen(dr.EVENT_DEVICE_REGISTRY_UPDATED, _updated_device) hass.bus.async_listen(dr.EVENT_DEVICE_REGISTRY_UPDATED, _updated_device)
) )
def _shutdown_rfxtrx(event): def _shutdown_rfxtrx(event: Event) -> None:
"""Close connection with RFXtrx.""" """Close connection with RFXtrx."""
rfx_object.close_connection() rfx_object.close_connection()
@ -288,10 +290,15 @@ async def async_setup_platform_entry(
async_add_entities: AddEntitiesCallback, async_add_entities: AddEntitiesCallback,
supported: Callable[[rfxtrxmod.RFXtrxEvent], bool], supported: Callable[[rfxtrxmod.RFXtrxEvent], bool],
constructor: Callable[ constructor: Callable[
[rfxtrxmod.RFXtrxEvent, rfxtrxmod.RFXtrxEvent | None, DeviceTuple, dict], [
rfxtrxmod.RFXtrxEvent,
rfxtrxmod.RFXtrxEvent | None,
DeviceTuple,
dict[str, Any],
],
list[Entity], list[Entity],
], ],
): ) -> None:
"""Set up config entry.""" """Set up config entry."""
entry_data = config_entry.data entry_data = config_entry.data
device_ids: set[DeviceTuple] = set() device_ids: set[DeviceTuple] = set()
@ -320,7 +327,7 @@ async def async_setup_platform_entry(
if entry_data[CONF_AUTOMATIC_ADD]: if entry_data[CONF_AUTOMATIC_ADD]:
@callback @callback
def _update(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple): def _update(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple) -> None:
"""Handle light updates from the RFXtrx gateway.""" """Handle light updates from the RFXtrx gateway."""
if not supported(event): if not supported(event):
return return
@ -373,7 +380,7 @@ def get_pt2262_cmd(device_id: str, data_bits: int) -> str | None:
def get_device_data_bits( def get_device_data_bits(
device: rfxtrxmod.RFXtrxDevice, devices: dict[DeviceTuple, dict] device: rfxtrxmod.RFXtrxDevice, devices: dict[DeviceTuple, dict[str, Any]]
) -> int | None: ) -> int | None:
"""Deduce data bits for device based on a cache of device bits.""" """Deduce data bits for device based on a cache of device bits."""
data_bits = None data_bits = None
@ -488,9 +495,9 @@ class RfxtrxEntity(RestoreEntity):
self._device_id = device_id self._device_id = device_id
# If id_string is 213c7f2:1, the group_id is 213c7f2, and the device will respond to # If id_string is 213c7f2:1, the group_id is 213c7f2, and the device will respond to
# group events regardless of their group indices. # group events regardless of their group indices.
(self._group_id, _, _) = device.id_string.partition(":") (self._group_id, _, _) = cast(str, device.id_string).partition(":")
async def async_added_to_hass(self): async def async_added_to_hass(self) -> None:
"""Restore RFXtrx device state (ON/OFF).""" """Restore RFXtrx device state (ON/OFF)."""
if self._event: if self._event:
self._apply_event(self._event) self._apply_event(self._event)
@ -500,13 +507,15 @@ class RfxtrxEntity(RestoreEntity):
) )
@property @property
def extra_state_attributes(self): def extra_state_attributes(self) -> dict[str, str] | None:
"""Return the device state attributes.""" """Return the device state attributes."""
if not self._event: if not self._event:
return None return None
return {ATTR_EVENT: "".join(f"{x:02x}" for x in self._event.data)} return {ATTR_EVENT: "".join(f"{x:02x}" for x in self._event.data)}
def _event_applies(self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple): def _event_applies(
self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple
) -> bool:
"""Check if event applies to me.""" """Check if event applies to me."""
if isinstance(event, rfxtrxmod.ControlEvent): if isinstance(event, rfxtrxmod.ControlEvent):
if ( if (
@ -514,7 +523,7 @@ class RfxtrxEntity(RestoreEntity):
and event.values["Command"] in COMMAND_GROUP_LIST and event.values["Command"] in COMMAND_GROUP_LIST
): ):
device: rfxtrxmod.RFXtrxDevice = event.device device: rfxtrxmod.RFXtrxDevice = event.device
(group_id, _, _) = device.id_string.partition(":") (group_id, _, _) = cast(str, device.id_string).partition(":")
return group_id == self._group_id return group_id == self._group_id
# Otherwise, the event only applies to the matching device. # Otherwise, the event only applies to the matching device.
@ -546,6 +555,6 @@ class RfxtrxCommandEntity(RfxtrxEntity):
"""Initialzie a switch or light device.""" """Initialzie a switch or light device."""
super().__init__(device, device_id, event=event) super().__init__(device, device_id, event=event)
async def _async_send(self, fun, *args): async def _async_send(self, fun: Callable[..., None], *args: Any) -> None:
rfx_object = self.hass.data[DOMAIN][DATA_RFXOBJECT] rfx_object = self.hass.data[DOMAIN][DATA_RFXOBJECT]
await self.hass.async_add_executor_job(fun, rfx_object.transport, *args) await self.hass.async_add_executor_job(fun, rfx_object.transport, *args)

View File

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from typing import Any
import RFXtrx as rfxtrxmod import RFXtrx as rfxtrxmod
@ -14,6 +15,7 @@ from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_COMMAND_OFF, CONF_COMMAND_ON, STATE_ON from homeassistant.const import CONF_COMMAND_OFF, CONF_COMMAND_ON, STATE_ON
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.helpers import event as evt from homeassistant.helpers import event as evt
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import DeviceTuple, RfxtrxEntity, async_setup_platform_entry, get_pt2262_cmd from . import DeviceTuple, RfxtrxEntity, async_setup_platform_entry, get_pt2262_cmd
@ -72,7 +74,7 @@ SENSOR_TYPES = (
SENSOR_TYPES_DICT = {desc.key: desc for desc in SENSOR_TYPES} SENSOR_TYPES_DICT = {desc.key: desc for desc in SENSOR_TYPES}
def supported(event: rfxtrxmod.RFXtrxEvent): def supported(event: rfxtrxmod.RFXtrxEvent) -> bool:
"""Return whether an event supports binary_sensor.""" """Return whether an event supports binary_sensor."""
if isinstance(event, rfxtrxmod.ControlEvent): if isinstance(event, rfxtrxmod.ControlEvent):
return True return True
@ -91,7 +93,7 @@ async def async_setup_entry(
) -> None: ) -> None:
"""Set up config entry.""" """Set up config entry."""
def get_sensor_description(type_string: str): def get_sensor_description(type_string: str) -> BinarySensorEntityDescription:
if (description := SENSOR_TYPES_DICT.get(type_string)) is None: if (description := SENSOR_TYPES_DICT.get(type_string)) is None:
return BinarySensorEntityDescription(key=type_string) return BinarySensorEntityDescription(key=type_string)
return description return description
@ -100,8 +102,8 @@ async def async_setup_entry(
event: rfxtrxmod.RFXtrxEvent, event: rfxtrxmod.RFXtrxEvent,
auto: rfxtrxmod.RFXtrxEvent | None, auto: rfxtrxmod.RFXtrxEvent | None,
device_id: DeviceTuple, device_id: DeviceTuple,
entity_info: dict, entity_info: dict[str, Any],
): ) -> list[Entity]:
return [ return [
RfxtrxBinarySensor( RfxtrxBinarySensor(
@ -122,10 +124,13 @@ async def async_setup_entry(
class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity): class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
"""A representation of a RFXtrx binary sensor.""" """A representation of a RFXtrx binary sensor.
Since all repeated events have meaning, these types of sensors
need to have force update enabled.
"""
_attr_force_update = True _attr_force_update = True
"""We should force updates. Repeated states have meaning."""
def __init__( def __init__(
self, self,
@ -159,7 +164,7 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
if self.is_on and self._off_delay is not None: if self.is_on and self._off_delay is not None:
self._attr_is_on = False self._attr_is_on = False
def _apply_event_lighting4(self, event: rfxtrxmod.RFXtrxEvent): def _apply_event_lighting4(self, event: rfxtrxmod.RFXtrxEvent) -> None:
"""Apply event for a lighting 4 device.""" """Apply event for a lighting 4 device."""
if self._data_bits is not None: if self._data_bits is not None:
cmdstr = get_pt2262_cmd(event.device.id_string, self._data_bits) cmdstr = get_pt2262_cmd(event.device.id_string, self._data_bits)
@ -172,7 +177,7 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
else: else:
self._attr_is_on = True self._attr_is_on = True
def _apply_event_standard(self, event: rfxtrxmod.RFXtrxEvent): def _apply_event_standard(self, event: rfxtrxmod.RFXtrxEvent) -> None:
assert isinstance(event, (rfxtrxmod.SensorEvent, rfxtrxmod.ControlEvent)) assert isinstance(event, (rfxtrxmod.SensorEvent, rfxtrxmod.ControlEvent))
if event.values.get("Command") in COMMAND_ON_LIST: if event.values.get("Command") in COMMAND_ON_LIST:
self._attr_is_on = True self._attr_is_on = True
@ -183,7 +188,7 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
elif event.values.get("Sensor Status") in SENSOR_STATUS_OFF: elif event.values.get("Sensor Status") in SENSOR_STATUS_OFF:
self._attr_is_on = False self._attr_is_on = False
def _apply_event(self, event: rfxtrxmod.RFXtrxEvent): def _apply_event(self, event: rfxtrxmod.RFXtrxEvent) -> None:
"""Apply command from rfxtrx.""" """Apply command from rfxtrx."""
super()._apply_event(event) super()._apply_event(event)
if event.device.packettype == DEVICE_PACKET_TYPE_LIGHTING4: if event.device.packettype == DEVICE_PACKET_TYPE_LIGHTING4:
@ -192,7 +197,9 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
self._apply_event_standard(event) self._apply_event_standard(event)
@callback @callback
def _handle_event(self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple): def _handle_event(
self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple
) -> None:
"""Check if event applies to me and update.""" """Check if event applies to me and update."""
if not self._event_applies(event, device_id): if not self._event_applies(event, device_id):
return return
@ -215,7 +222,7 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
if self.is_on and self._off_delay is not None: if self.is_on and self._off_delay is not None:
@callback @callback
def off_delay_listener(now): def off_delay_listener(now: Any) -> None:
"""Switch device off after a delay.""" """Switch device off after a delay."""
self._delay_listener = None self._delay_listener = None
self._attr_is_on = False self._attr_is_on = False

View File

@ -4,14 +4,14 @@ from __future__ import annotations
import copy import copy
import itertools import itertools
import os import os
from typing import TypedDict, cast from typing import Any, TypedDict, cast
import RFXtrx as rfxtrxmod import RFXtrx as rfxtrxmod
import serial import serial
import serial.tools.list_ports import serial.tools.list_ports
import voluptuous as vol import voluptuous as vol
from homeassistant import config_entries, exceptions from homeassistant import config_entries, data_entry_flow, exceptions
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ( from homeassistant.const import (
CONF_COMMAND_OFF, CONF_COMMAND_OFF,
@ -64,7 +64,7 @@ class DeviceData(TypedDict):
device_id: DeviceTuple device_id: DeviceTuple
def none_or_int(value, base): def none_or_int(value: str | None, base: int) -> int | None:
"""Check if strin is one otherwise convert to int.""" """Check if strin is one otherwise convert to int."""
if value is None: if value is None:
return None return None
@ -80,17 +80,21 @@ class OptionsFlow(config_entries.OptionsFlow):
def __init__(self, config_entry: ConfigEntry) -> None: def __init__(self, config_entry: ConfigEntry) -> None:
"""Initialize rfxtrx options flow.""" """Initialize rfxtrx options flow."""
self._config_entry = config_entry self._config_entry = config_entry
self._global_options = None self._global_options: dict[str, Any] = {}
self._selected_device = None self._selected_device: dict[str, Any] = {}
self._selected_device_entry_id: str | None = None self._selected_device_entry_id: str | None = None
self._selected_device_event_code: str | None = None self._selected_device_event_code: str | None = None
self._selected_device_object: rfxtrxmod.RFXtrxEvent | None = None self._selected_device_object: rfxtrxmod.RFXtrxEvent | None = None
async def async_step_init(self, user_input=None): async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> data_entry_flow.FlowResult:
"""Manage the options.""" """Manage the options."""
return await self.async_step_prompt_options() return await self.async_step_prompt_options()
async def async_step_prompt_options(self, user_input=None): async def async_step_prompt_options(
self, user_input: dict[str, Any] | None = None
) -> data_entry_flow.FlowResult:
"""Prompt for options.""" """Prompt for options."""
errors = {} errors = {}
@ -103,7 +107,8 @@ class OptionsFlow(config_entries.OptionsFlow):
entry_id = user_input[CONF_DEVICE] entry_id = user_input[CONF_DEVICE]
device_data = self._get_device_data(entry_id) device_data = self._get_device_data(entry_id)
self._selected_device_entry_id = entry_id self._selected_device_entry_id = entry_id
event_code = device_data[CONF_EVENT_CODE] event_code = device_data["event_code"]
assert event_code
self._selected_device_event_code = event_code self._selected_device_event_code = event_code
self._selected_device = self._config_entry.data[CONF_DEVICES][ self._selected_device = self._config_entry.data[CONF_DEVICES][
event_code event_code
@ -111,7 +116,9 @@ class OptionsFlow(config_entries.OptionsFlow):
self._selected_device_object = get_rfx_object(event_code) self._selected_device_object = get_rfx_object(event_code)
return await self.async_step_set_device_options() return await self.async_step_set_device_options()
if CONF_EVENT_CODE in user_input: if CONF_EVENT_CODE in user_input:
self._selected_device_event_code = user_input[CONF_EVENT_CODE] self._selected_device_event_code = cast(
str, user_input[CONF_EVENT_CODE]
)
self._selected_device = {} self._selected_device = {}
selected_device_object = get_rfx_object( selected_device_object = get_rfx_object(
self._selected_device_event_code self._selected_device_event_code
@ -159,13 +166,17 @@ class OptionsFlow(config_entries.OptionsFlow):
step_id="prompt_options", data_schema=vol.Schema(options), errors=errors step_id="prompt_options", data_schema=vol.Schema(options), errors=errors
) )
async def async_step_set_device_options(self, user_input=None): async def async_step_set_device_options(
self, user_input: dict[str, Any] | None = None
) -> data_entry_flow.FlowResult:
"""Manage device options.""" """Manage device options."""
errors = {} errors = {}
if user_input is not None:
assert self._selected_device_object assert self._selected_device_object
assert self._selected_device_event_code assert self._selected_device_event_code
if user_input is not None:
devices: dict[str, dict[str, Any] | None] = {}
device: dict[str, Any]
device_id = get_device_id( device_id = get_device_id(
self._selected_device_object.device, self._selected_device_object.device,
data_bits=user_input.get(CONF_DATA_BITS), data_bits=user_input.get(CONF_DATA_BITS),
@ -278,16 +289,16 @@ class OptionsFlow(config_entries.OptionsFlow):
), ),
} }
) )
devices = { replace_devices = {
entry.id: entry.name_by_user if entry.name_by_user else entry.name entry.id: entry.name_by_user if entry.name_by_user else entry.name
for entry in self._device_entries for entry in self._device_entries
if self._can_replace_device(entry.id) if self._can_replace_device(entry.id)
} }
if devices: if replace_devices:
data_schema.update( data_schema.update(
{ {
vol.Optional(CONF_REPLACE_DEVICE): vol.In(devices), vol.Optional(CONF_REPLACE_DEVICE): vol.In(replace_devices),
} }
) )
@ -297,11 +308,13 @@ class OptionsFlow(config_entries.OptionsFlow):
errors=errors, errors=errors,
) )
async def _async_replace_device(self, replace_device): async def _async_replace_device(self, replace_device: str) -> None:
"""Migrate properties of a device into another.""" """Migrate properties of a device into another."""
device_registry = self._device_registry device_registry = self._device_registry
old_device = self._selected_device_entry_id old_device = self._selected_device_entry_id
assert old_device
old_entry = device_registry.async_get(old_device) old_entry = device_registry.async_get(old_device)
assert old_entry
device_registry.async_update_device( device_registry.async_update_device(
replace_device, replace_device,
area_id=old_entry.area_id, area_id=old_entry.area_id,
@ -343,23 +356,29 @@ class OptionsFlow(config_entries.OptionsFlow):
device_registry.async_remove_device(old_device) device_registry.async_remove_device(old_device)
def _can_add_device(self, new_rfx_obj): def _can_add_device(self, new_rfx_obj: rfxtrxmod.RFXtrxEvent) -> bool:
"""Check if device does not already exist.""" """Check if device does not already exist."""
new_device_id = get_device_id(new_rfx_obj.device) new_device_id = get_device_id(new_rfx_obj.device)
for packet_id, entity_info in self._config_entry.data[CONF_DEVICES].items(): for packet_id, entity_info in self._config_entry.data[CONF_DEVICES].items():
rfx_obj = get_rfx_object(packet_id) rfx_obj = get_rfx_object(packet_id)
assert rfx_obj
device_id = get_device_id(rfx_obj.device, entity_info.get(CONF_DATA_BITS)) device_id = get_device_id(rfx_obj.device, entity_info.get(CONF_DATA_BITS))
if new_device_id == device_id: if new_device_id == device_id:
return False return False
return True return True
def _can_replace_device(self, entry_id): def _can_replace_device(self, entry_id: str) -> bool:
"""Check if device can be replaced with selected device.""" """Check if device can be replaced with selected device."""
assert self._selected_device_object
device_data = self._get_device_data(entry_id) device_data = self._get_device_data(entry_id)
if (event_code := device_data[CONF_EVENT_CODE]) is not None: if (event_code := device_data["event_code"]) is not None:
rfx_obj = get_rfx_object(event_code) rfx_obj = get_rfx_object(event_code)
assert rfx_obj
if ( if (
rfx_obj.device.packettype rfx_obj.device.packettype
== self._selected_device_object.device.packettype == self._selected_device_object.device.packettype
@ -371,12 +390,12 @@ class OptionsFlow(config_entries.OptionsFlow):
return False return False
def _get_device_event_code(self, entry_id): def _get_device_event_code(self, entry_id: str) -> str | None:
data = self._get_device_data(entry_id) data = self._get_device_data(entry_id)
return data[CONF_EVENT_CODE] return data["event_code"]
def _get_device_data(self, entry_id) -> DeviceData: def _get_device_data(self, entry_id: str) -> DeviceData:
"""Get event code based on device identifier.""" """Get event code based on device identifier."""
event_code: str | None = None event_code: str | None = None
entry = self._device_registry.async_get(entry_id) entry = self._device_registry.async_get(entry_id)
@ -390,7 +409,11 @@ class OptionsFlow(config_entries.OptionsFlow):
return DeviceData(event_code=event_code, device_id=device_id) return DeviceData(event_code=event_code, device_id=device_id)
@callback @callback
def update_config_data(self, global_options=None, devices=None): def update_config_data(
self,
global_options: dict[str, Any] | None = None,
devices: dict[str, Any] | None = None,
) -> None:
"""Update data in ConfigEntry.""" """Update data in ConfigEntry."""
entry_data = self._config_entry.data.copy() entry_data = self._config_entry.data.copy()
entry_data[CONF_DEVICES] = copy.deepcopy(self._config_entry.data[CONF_DEVICES]) entry_data[CONF_DEVICES] = copy.deepcopy(self._config_entry.data[CONF_DEVICES])
@ -413,12 +436,14 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1 VERSION = 1
async def async_step_user(self, user_input=None): async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> data_entry_flow.FlowResult:
"""Step when user initializes a integration.""" """Step when user initializes a integration."""
await self.async_set_unique_id(DOMAIN) await self.async_set_unique_id(DOMAIN)
self._abort_if_unique_id_configured() self._abort_if_unique_id_configured()
errors = {} errors: dict[str, str] = {}
if user_input is not None: if user_input is not None:
if user_input[CONF_TYPE] == "Serial": if user_input[CONF_TYPE] == "Serial":
return await self.async_step_setup_serial() return await self.async_step_setup_serial()
@ -430,9 +455,11 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
schema = vol.Schema({vol.Required(CONF_TYPE): vol.In(list_of_types)}) schema = vol.Schema({vol.Required(CONF_TYPE): vol.In(list_of_types)})
return self.async_show_form(step_id="user", data_schema=schema, errors=errors) return self.async_show_form(step_id="user", data_schema=schema, errors=errors)
async def async_step_setup_network(self, user_input=None): async def async_step_setup_network(
self, user_input: dict[str, Any] | None = None
) -> data_entry_flow.FlowResult:
"""Step when setting up network configuration.""" """Step when setting up network configuration."""
errors = {} errors: dict[str, str] = {}
if user_input is not None: if user_input is not None:
host = user_input[CONF_HOST] host = user_input[CONF_HOST]
@ -455,9 +482,11 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
errors=errors, errors=errors,
) )
async def async_step_setup_serial(self, user_input=None): async def async_step_setup_serial(
self, user_input: dict[str, Any] | None = None
) -> data_entry_flow.FlowResult:
"""Step when setting up serial configuration.""" """Step when setting up serial configuration."""
errors = {} errors: dict[str, str] = {}
if user_input is not None: if user_input is not None:
user_selection = user_input[CONF_DEVICE] user_selection = user_input[CONF_DEVICE]
@ -493,9 +522,11 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
errors=errors, errors=errors,
) )
async def async_step_setup_serial_manual_path(self, user_input=None): async def async_step_setup_serial_manual_path(
self, user_input: dict[str, Any] | None = None
) -> data_entry_flow.FlowResult:
"""Select path manually.""" """Select path manually."""
errors = {} errors: dict[str, str] = {}
if user_input is not None: if user_input is not None:
device = user_input[CONF_DEVICE] device = user_input[CONF_DEVICE]
@ -514,7 +545,12 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
errors=errors, errors=errors,
) )
async def async_validate_rfx(self, host=None, port=None, device=None): async def async_validate_rfx(
self,
host: str | None = None,
port: int | None = None,
device: str | None = None,
) -> dict[str, Any]:
"""Create data for rfxtrx entry.""" """Create data for rfxtrx entry."""
success = await self.hass.async_add_executor_job( success = await self.hass.async_add_executor_job(
_test_transport, host, port, device _test_transport, host, port, device
@ -522,7 +558,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
if not success: if not success:
raise CannotConnect raise CannotConnect
data = { data: dict[str, Any] = {
CONF_HOST: host, CONF_HOST: host,
CONF_PORT: port, CONF_PORT: port,
CONF_DEVICE: device, CONF_DEVICE: device,
@ -538,7 +574,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
return OptionsFlow(config_entry) return OptionsFlow(config_entry)
def _test_transport(host, port, device): def _test_transport(host: str | None, port: int | None, device: str | None) -> bool:
"""Construct a rfx object based on config.""" """Construct a rfx object based on config."""
if port is not None: if port is not None:
try: try:

View File

@ -10,6 +10,7 @@ from homeassistant.components.cover import CoverEntity, CoverEntityFeature
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_OPEN from homeassistant.const import STATE_OPEN
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import DeviceTuple, RfxtrxCommandEntity, async_setup_platform_entry from . import DeviceTuple, RfxtrxCommandEntity, async_setup_platform_entry
@ -24,9 +25,9 @@ from .const import (
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
def supported(event: rfxtrxmod.RFXtrxEvent): def supported(event: rfxtrxmod.RFXtrxEvent) -> bool:
"""Return whether an event supports cover.""" """Return whether an event supports cover."""
return event.device.known_to_be_rollershutter return bool(event.device.known_to_be_rollershutter)
async def async_setup_entry( async def async_setup_entry(
@ -40,8 +41,8 @@ async def async_setup_entry(
event: rfxtrxmod.RFXtrxEvent, event: rfxtrxmod.RFXtrxEvent,
auto: rfxtrxmod.RFXtrxEvent | None, auto: rfxtrxmod.RFXtrxEvent | None,
device_id: DeviceTuple, device_id: DeviceTuple,
entity_info: dict, entity_info: dict[str, Any],
): ) -> list[Entity]:
return [ return [
RfxtrxCover( RfxtrxCover(
event.device, event.device,
@ -144,7 +145,7 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
self._attr_is_closed = False self._attr_is_closed = False
self.async_write_ha_state() self.async_write_ha_state()
def _apply_event(self, event: rfxtrxmod.RFXtrxEvent): def _apply_event(self, event: rfxtrxmod.RFXtrxEvent) -> None:
"""Apply command from rfxtrx.""" """Apply command from rfxtrx."""
assert isinstance(event, rfxtrxmod.ControlEvent) assert isinstance(event, rfxtrxmod.ControlEvent)
super()._apply_event(event) super()._apply_event(event)
@ -154,7 +155,9 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
self._attr_is_closed = True self._attr_is_closed = True
@callback @callback
def _handle_event(self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple): def _handle_event(
self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple
) -> None:
"""Check if event applies to me and update.""" """Check if event applies to me and update."""
if device_id != self._device_id: if device_id != self._device_id:
return return

View File

@ -1,6 +1,8 @@
"""Provides device automations for RFXCOM RFXtrx.""" """Provides device automations for RFXCOM RFXtrx."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Callable
import voluptuous as vol import voluptuous as vol
from homeassistant.components.device_automation.exceptions import ( from homeassistant.components.device_automation.exceptions import (
@ -65,7 +67,9 @@ async def async_get_actions(
return actions return actions
def _get_commands(hass, device_id, action_type): def _get_commands(
hass: HomeAssistant, device_id: str, action_type: str
) -> tuple[dict[str, str], Callable[..., None]]:
device = async_get_device_object(hass, device_id) device = async_get_device_object(hass, device_id)
send_fun = getattr(device, action_type) send_fun = getattr(device, action_type)
commands = getattr(device, ACTION_SELECTION[action_type], {}) commands = getattr(device, ACTION_SELECTION[action_type], {})

View File

@ -1,14 +1,14 @@
"""Provides helpers for RFXtrx.""" """Provides helpers for RFXtrx."""
from RFXtrx import get_device from RFXtrx import RFXtrxDevice, get_device
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr from homeassistant.helpers import device_registry as dr
@callback @callback
def async_get_device_object(hass: HomeAssistant, device_id): def async_get_device_object(hass: HomeAssistant, device_id: str) -> RFXtrxDevice:
"""Get a device for the given device registry id.""" """Get a device for the given device registry id."""
device_registry = dr.async_get(hass) device_registry = dr.async_get(hass)
registry_device = device_registry.async_get(device_id) registry_device = device_registry.async_get(device_id)

View File

@ -10,6 +10,7 @@ from homeassistant.components.light import ATTR_BRIGHTNESS, ColorMode, LightEnti
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_ON from homeassistant.const import STATE_ON
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import DeviceTuple, RfxtrxCommandEntity, async_setup_platform_entry from . import DeviceTuple, RfxtrxCommandEntity, async_setup_platform_entry
@ -18,7 +19,7 @@ from .const import COMMAND_OFF_LIST, COMMAND_ON_LIST
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
def supported(event: rfxtrxmod.RFXtrxEvent): def supported(event: rfxtrxmod.RFXtrxEvent) -> bool:
"""Return whether an event supports light.""" """Return whether an event supports light."""
return ( return (
isinstance(event.device, rfxtrxmod.LightingDevice) isinstance(event.device, rfxtrxmod.LightingDevice)
@ -37,8 +38,8 @@ async def async_setup_entry(
event: rfxtrxmod.RFXtrxEvent, event: rfxtrxmod.RFXtrxEvent,
auto: rfxtrxmod.RFXtrxEvent | None, auto: rfxtrxmod.RFXtrxEvent | None,
device_id: DeviceTuple, device_id: DeviceTuple,
entity_info: dict, entity_info: dict[str, Any],
): ) -> list[Entity]:
return [ return [
RfxtrxLight( RfxtrxLight(
event.device, event.device,
@ -91,7 +92,7 @@ class RfxtrxLight(RfxtrxCommandEntity, LightEntity):
self._attr_brightness = 0 self._attr_brightness = 0
self.async_write_ha_state() self.async_write_ha_state()
def _apply_event(self, event: rfxtrxmod.RFXtrxEvent): def _apply_event(self, event: rfxtrxmod.RFXtrxEvent) -> None:
"""Apply command from rfxtrx.""" """Apply command from rfxtrx."""
assert isinstance(event, rfxtrxmod.ControlEvent) assert isinstance(event, rfxtrxmod.ControlEvent)
super()._apply_event(event) super()._apply_event(event)
@ -105,7 +106,9 @@ class RfxtrxLight(RfxtrxCommandEntity, LightEntity):
self._attr_is_on = brightness > 0 self._attr_is_on = brightness > 0
@callback @callback
def _handle_event(self, event, device_id): def _handle_event(
self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple
) -> None:
"""Check if event applies to me and update.""" """Check if event applies to me and update."""
if device_id != self._device_id: if device_id != self._device_id:
return return

View File

@ -3,9 +3,12 @@ from __future__ import annotations
from collections.abc import Callable from collections.abc import Callable
from dataclasses import dataclass from dataclasses import dataclass
from datetime import date, datetime
from decimal import Decimal
import logging import logging
from typing import Any, cast
from RFXtrx import ControlEvent, RFXtrxEvent, SensorEvent from RFXtrx import ControlEvent, RFXtrxDevice, RFXtrxEvent, SensorEvent
from homeassistant.components.sensor import ( from homeassistant.components.sensor import (
SensorDeviceClass, SensorDeviceClass,
@ -30,8 +33,9 @@ from homeassistant.const import (
UV_INDEX, UV_INDEX,
) )
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity import EntityCategory from homeassistant.helpers.entity import Entity, EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import StateType
from . import DeviceTuple, RfxtrxEntity, async_setup_platform_entry, get_rfx_object from . import DeviceTuple, RfxtrxEntity, async_setup_platform_entry, get_rfx_object
from .const import ATTR_EVENT from .const import ATTR_EVENT
@ -39,14 +43,14 @@ from .const import ATTR_EVENT
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
def _battery_convert(value): def _battery_convert(value: int | None) -> int | None:
"""Battery is given as a value between 0 and 9.""" """Battery is given as a value between 0 and 9."""
if value is None: if value is None:
return None return None
return (value + 1) * 10 return (value + 1) * 10
def _rssi_convert(value): def _rssi_convert(value: int | None) -> str | None:
"""Rssi is given as dBm value.""" """Rssi is given as dBm value."""
if value is None: if value is None:
return None return None
@ -57,7 +61,9 @@ def _rssi_convert(value):
class RfxtrxSensorEntityDescription(SensorEntityDescription): class RfxtrxSensorEntityDescription(SensorEntityDescription):
"""Description of sensor entities.""" """Description of sensor entities."""
convert: Callable = lambda x: x convert: Callable[[Any], StateType | date | datetime | Decimal] = lambda x: cast(
StateType, x
)
SENSOR_TYPES = ( SENSOR_TYPES = (
@ -243,16 +249,16 @@ async def async_setup_entry(
) -> None: ) -> None:
"""Set up config entry.""" """Set up config entry."""
def _supported(event): def _supported(event: RFXtrxEvent) -> bool:
return isinstance(event, (ControlEvent, SensorEvent)) return isinstance(event, (ControlEvent, SensorEvent))
def _constructor( def _constructor(
event: RFXtrxEvent, event: RFXtrxEvent,
auto: RFXtrxEvent | None, auto: RFXtrxEvent | None,
device_id: DeviceTuple, device_id: DeviceTuple,
entity_info: dict, entity_info: dict[str, Any],
): ) -> list[Entity]:
entities: list[RfxtrxSensor] = [] entities: list[Entity] = []
for data_type in set(event.values) & set(SENSOR_TYPES_DICT): for data_type in set(event.values) & set(SENSOR_TYPES_DICT):
entities.append( entities.append(
RfxtrxSensor( RfxtrxSensor(
@ -271,14 +277,22 @@ async def async_setup_entry(
class RfxtrxSensor(RfxtrxEntity, SensorEntity): class RfxtrxSensor(RfxtrxEntity, SensorEntity):
"""Representation of a RFXtrx sensor.""" """Representation of a RFXtrx sensor.
Since all repeated events have meaning, these types of sensors
need to have force update enabled.
"""
_attr_force_update = True _attr_force_update = True
"""We should force updates. Repeated states have meaning."""
entity_description: RfxtrxSensorEntityDescription entity_description: RfxtrxSensorEntityDescription
def __init__(self, device, device_id, entity_description, event=None): def __init__(
self,
device: RFXtrxDevice,
device_id: DeviceTuple,
entity_description: RfxtrxSensorEntityDescription,
event: RFXtrxEvent | None = None,
) -> None:
"""Initialize the sensor.""" """Initialize the sensor."""
super().__init__(device, device_id, event=event) super().__init__(device, device_id, event=event)
self.entity_description = entity_description self.entity_description = entity_description
@ -296,7 +310,7 @@ class RfxtrxSensor(RfxtrxEntity, SensorEntity):
self._apply_event(get_rfx_object(event)) self._apply_event(get_rfx_object(event))
@property @property
def native_value(self): def native_value(self) -> StateType | date | datetime | Decimal:
"""Return the state of the sensor.""" """Return the state of the sensor."""
if not self._event: if not self._event:
return None return None
@ -304,7 +318,7 @@ class RfxtrxSensor(RfxtrxEntity, SensorEntity):
return self.entity_description.convert(value) return self.entity_description.convert(value)
@callback @callback
def _handle_event(self, event, device_id): def _handle_event(self, event: RFXtrxEvent, device_id: DeviceTuple) -> None:
"""Check if event applies to me and update.""" """Check if event applies to me and update."""
if device_id != self._device_id: if device_id != self._device_id:
return return

View File

@ -58,8 +58,8 @@ async def async_setup_entry(
event: rfxtrxmod.RFXtrxEvent, event: rfxtrxmod.RFXtrxEvent,
auto: rfxtrxmod.RFXtrxEvent | None, auto: rfxtrxmod.RFXtrxEvent | None,
device_id: DeviceTuple, device_id: DeviceTuple,
entity_info: dict, entity_info: dict[str, Any],
): ) -> list[Entity]:
"""Construct a entity from an event.""" """Construct a entity from an event."""
device = event.device device = event.device
@ -85,6 +85,7 @@ async def async_setup_entry(
auto, auto,
) )
] ]
return []
await async_setup_platform_entry( await async_setup_platform_entry(
hass, config_entry, async_add_entities, supported, _constructor hass, config_entry, async_add_entities, supported, _constructor

View File

@ -10,6 +10,7 @@ from homeassistant.components.switch import SwitchEntity
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_COMMAND_OFF, CONF_COMMAND_ON, STATE_ON from homeassistant.const import CONF_COMMAND_OFF, CONF_COMMAND_ON, STATE_ON
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import ( from . import (
@ -31,7 +32,7 @@ DATA_SWITCH = f"{DOMAIN}_switch"
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
def supported(event): def supported(event: rfxtrxmod.RFXtrxEvent) -> bool:
"""Return whether an event supports switch.""" """Return whether an event supports switch."""
return ( return (
isinstance(event.device, rfxtrxmod.LightingDevice) isinstance(event.device, rfxtrxmod.LightingDevice)
@ -52,8 +53,8 @@ async def async_setup_entry(
event: rfxtrxmod.RFXtrxEvent, event: rfxtrxmod.RFXtrxEvent,
auto: rfxtrxmod.RFXtrxEvent | None, auto: rfxtrxmod.RFXtrxEvent | None,
device_id: DeviceTuple, device_id: DeviceTuple,
entity_info: dict, entity_info: dict[str, Any],
): ) -> list[Entity]:
return [ return [
RfxtrxSwitch( RfxtrxSwitch(
event.device, event.device,
@ -97,7 +98,7 @@ class RfxtrxSwitch(RfxtrxCommandEntity, SwitchEntity):
if old_state is not None: if old_state is not None:
self._attr_is_on = old_state.state == STATE_ON self._attr_is_on = old_state.state == STATE_ON
def _apply_event_lighting4(self, event: rfxtrxmod.RFXtrxEvent): def _apply_event_lighting4(self, event: rfxtrxmod.RFXtrxEvent) -> None:
"""Apply event for a lighting 4 device.""" """Apply event for a lighting 4 device."""
if self._data_bits is not None: if self._data_bits is not None:
cmdstr = get_pt2262_cmd(event.device.id_string, self._data_bits) cmdstr = get_pt2262_cmd(event.device.id_string, self._data_bits)

View File

@ -1882,6 +1882,16 @@ disallow_untyped_defs = true
warn_return_any = true warn_return_any = true
warn_unreachable = true warn_unreachable = true
[mypy-homeassistant.components.rfxtrx.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.rhasspy.*] [mypy-homeassistant.components.rhasspy.*]
check_untyped_defs = true check_untyped_defs = true
disallow_incomplete_defs = true disallow_incomplete_defs = true