diff --git a/.strict-typing b/.strict-typing index 8938e694a7e..9df3d16dc43 100644 --- a/.strict-typing +++ b/.strict-typing @@ -213,6 +213,7 @@ homeassistant.components.recorder.* homeassistant.components.remote.* homeassistant.components.renault.* homeassistant.components.repairs.* +homeassistant.components.rfxtrx.* homeassistant.components.rhasspy.* homeassistant.components.ridwell.* homeassistant.components.rituals_perfume_genie.* diff --git a/homeassistant/components/rfxtrx/__init__.py b/homeassistant/components/rfxtrx/__init__.py index 3257a482c0c..31a0e4694ba 100644 --- a/homeassistant/components/rfxtrx/__init__.py +++ b/homeassistant/components/rfxtrx/__init__.py @@ -6,7 +6,7 @@ import binascii from collections.abc import Callable import copy import logging -from typing import NamedTuple, cast +from typing import Any, NamedTuple, cast import RFXtrx as rfxtrxmod import async_timeout @@ -61,7 +61,7 @@ class DeviceTuple(NamedTuple): id_string: str -def _bytearray_string(data): +def _bytearray_string(data: Any) -> bytearray: val = cv.string(data) try: return bytearray.fromhex(val) @@ -116,7 +116,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: return True -def _create_rfx(config): +def _create_rfx(config: dict[str, Any]) -> rfxtrxmod.Connect: """Construct a rfx object based on config.""" modes = config.get(CONF_PROTOCOLS) @@ -144,7 +144,9 @@ def _create_rfx(config): return rfx -def _get_device_lookup(devices): +def _get_device_lookup( + devices: dict[str, dict[str, Any]] +) -> dict[DeviceTuple, dict[str, Any]]: """Get a lookup structure for devices.""" lookup = {} for event_code, event_config in devices.items(): @@ -157,7 +159,7 @@ def _get_device_lookup(devices): return lookup -async def async_setup_internal(hass, entry: ConfigEntry): +async def async_setup_internal(hass: HomeAssistant, entry: ConfigEntry) -> None: """Set up the RFXtrx component.""" config = entry.data @@ -173,7 +175,7 @@ async def async_setup_internal(hass, entry: ConfigEntry): # Declare the Handle event @callback - def async_handle_receive(event): + def async_handle_receive(event: rfxtrxmod.RFXtrxEvent) -> None: """Handle received messages from RFXtrx gateway.""" # Log RFXCOM event if not event.device.id_string: @@ -204,7 +206,7 @@ async def async_setup_internal(hass, entry: ConfigEntry): pt2262_devices.append(event.device.id_string) device_entry = device_registry.async_get_device( - identifiers={(DOMAIN, *device_id)}, + identifiers={(DOMAIN, *device_id)}, # type: ignore[arg-type] ) if device_entry: event_data[ATTR_DEVICE_ID] = device_entry.id @@ -216,7 +218,7 @@ async def async_setup_internal(hass, entry: ConfigEntry): hass.bus.async_fire(EVENT_RFXTRX_EVENT, event_data) @callback - def _add_device(event, device_id): + def _add_device(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple) -> None: """Add a device to config entry.""" config = {} config[CONF_DEVICE_ID] = device_id @@ -237,7 +239,7 @@ async def async_setup_internal(hass, entry: ConfigEntry): devices[device_id] = config @callback - def _remove_device(device_id: DeviceTuple): + def _remove_device(device_id: DeviceTuple) -> None: data = { **entry.data, CONF_DEVICES: { @@ -250,7 +252,7 @@ async def async_setup_internal(hass, entry: ConfigEntry): devices.pop(device_id) @callback - def _updated_device(event: Event): + def _updated_device(event: Event) -> None: if event.data["action"] != "remove": return device_entry = device_registry.deleted_devices[event.data["device_id"]] @@ -264,7 +266,7 @@ async def async_setup_internal(hass, entry: ConfigEntry): hass.bus.async_listen(dr.EVENT_DEVICE_REGISTRY_UPDATED, _updated_device) ) - def _shutdown_rfxtrx(event): + def _shutdown_rfxtrx(event: Event) -> None: """Close connection with RFXtrx.""" rfx_object.close_connection() @@ -288,10 +290,15 @@ async def async_setup_platform_entry( async_add_entities: AddEntitiesCallback, supported: Callable[[rfxtrxmod.RFXtrxEvent], bool], constructor: Callable[ - [rfxtrxmod.RFXtrxEvent, rfxtrxmod.RFXtrxEvent | None, DeviceTuple, dict], + [ + rfxtrxmod.RFXtrxEvent, + rfxtrxmod.RFXtrxEvent | None, + DeviceTuple, + dict[str, Any], + ], list[Entity], ], -): +) -> None: """Set up config entry.""" entry_data = config_entry.data device_ids: set[DeviceTuple] = set() @@ -320,7 +327,7 @@ async def async_setup_platform_entry( if entry_data[CONF_AUTOMATIC_ADD]: @callback - def _update(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple): + def _update(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple) -> None: """Handle light updates from the RFXtrx gateway.""" if not supported(event): return @@ -373,7 +380,7 @@ def get_pt2262_cmd(device_id: str, data_bits: int) -> str | None: def get_device_data_bits( - device: rfxtrxmod.RFXtrxDevice, devices: dict[DeviceTuple, dict] + device: rfxtrxmod.RFXtrxDevice, devices: dict[DeviceTuple, dict[str, Any]] ) -> int | None: """Deduce data bits for device based on a cache of device bits.""" data_bits = None @@ -488,9 +495,9 @@ class RfxtrxEntity(RestoreEntity): self._device_id = device_id # If id_string is 213c7f2:1, the group_id is 213c7f2, and the device will respond to # group events regardless of their group indices. - (self._group_id, _, _) = device.id_string.partition(":") + (self._group_id, _, _) = cast(str, device.id_string).partition(":") - async def async_added_to_hass(self): + async def async_added_to_hass(self) -> None: """Restore RFXtrx device state (ON/OFF).""" if self._event: self._apply_event(self._event) @@ -500,13 +507,15 @@ class RfxtrxEntity(RestoreEntity): ) @property - def extra_state_attributes(self): + def extra_state_attributes(self) -> dict[str, str] | None: """Return the device state attributes.""" if not self._event: return None return {ATTR_EVENT: "".join(f"{x:02x}" for x in self._event.data)} - def _event_applies(self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple): + def _event_applies( + self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple + ) -> bool: """Check if event applies to me.""" if isinstance(event, rfxtrxmod.ControlEvent): if ( @@ -514,7 +523,7 @@ class RfxtrxEntity(RestoreEntity): and event.values["Command"] in COMMAND_GROUP_LIST ): device: rfxtrxmod.RFXtrxDevice = event.device - (group_id, _, _) = device.id_string.partition(":") + (group_id, _, _) = cast(str, device.id_string).partition(":") return group_id == self._group_id # Otherwise, the event only applies to the matching device. @@ -546,6 +555,6 @@ class RfxtrxCommandEntity(RfxtrxEntity): """Initialzie a switch or light device.""" super().__init__(device, device_id, event=event) - async def _async_send(self, fun, *args): + async def _async_send(self, fun: Callable[..., None], *args: Any) -> None: rfx_object = self.hass.data[DOMAIN][DATA_RFXOBJECT] await self.hass.async_add_executor_job(fun, rfx_object.transport, *args) diff --git a/homeassistant/components/rfxtrx/binary_sensor.py b/homeassistant/components/rfxtrx/binary_sensor.py index df79c17263c..03ad7ce071b 100644 --- a/homeassistant/components/rfxtrx/binary_sensor.py +++ b/homeassistant/components/rfxtrx/binary_sensor.py @@ -2,6 +2,7 @@ from __future__ import annotations import logging +from typing import Any import RFXtrx as rfxtrxmod @@ -14,6 +15,7 @@ from homeassistant.config_entries import ConfigEntry from homeassistant.const import CONF_COMMAND_OFF, CONF_COMMAND_ON, STATE_ON from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback from homeassistant.helpers import event as evt +from homeassistant.helpers.entity import Entity from homeassistant.helpers.entity_platform import AddEntitiesCallback from . import DeviceTuple, RfxtrxEntity, async_setup_platform_entry, get_pt2262_cmd @@ -72,7 +74,7 @@ SENSOR_TYPES = ( SENSOR_TYPES_DICT = {desc.key: desc for desc in SENSOR_TYPES} -def supported(event: rfxtrxmod.RFXtrxEvent): +def supported(event: rfxtrxmod.RFXtrxEvent) -> bool: """Return whether an event supports binary_sensor.""" if isinstance(event, rfxtrxmod.ControlEvent): return True @@ -91,7 +93,7 @@ async def async_setup_entry( ) -> None: """Set up config entry.""" - def get_sensor_description(type_string: str): + def get_sensor_description(type_string: str) -> BinarySensorEntityDescription: if (description := SENSOR_TYPES_DICT.get(type_string)) is None: return BinarySensorEntityDescription(key=type_string) return description @@ -100,8 +102,8 @@ async def async_setup_entry( event: rfxtrxmod.RFXtrxEvent, auto: rfxtrxmod.RFXtrxEvent | None, device_id: DeviceTuple, - entity_info: dict, - ): + entity_info: dict[str, Any], + ) -> list[Entity]: return [ RfxtrxBinarySensor( @@ -122,10 +124,13 @@ async def async_setup_entry( class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity): - """A representation of a RFXtrx binary sensor.""" + """A representation of a RFXtrx binary sensor. + + Since all repeated events have meaning, these types of sensors + need to have force update enabled. + """ _attr_force_update = True - """We should force updates. Repeated states have meaning.""" def __init__( self, @@ -159,7 +164,7 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity): if self.is_on and self._off_delay is not None: self._attr_is_on = False - def _apply_event_lighting4(self, event: rfxtrxmod.RFXtrxEvent): + def _apply_event_lighting4(self, event: rfxtrxmod.RFXtrxEvent) -> None: """Apply event for a lighting 4 device.""" if self._data_bits is not None: cmdstr = get_pt2262_cmd(event.device.id_string, self._data_bits) @@ -172,7 +177,7 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity): else: self._attr_is_on = True - def _apply_event_standard(self, event: rfxtrxmod.RFXtrxEvent): + def _apply_event_standard(self, event: rfxtrxmod.RFXtrxEvent) -> None: assert isinstance(event, (rfxtrxmod.SensorEvent, rfxtrxmod.ControlEvent)) if event.values.get("Command") in COMMAND_ON_LIST: self._attr_is_on = True @@ -183,7 +188,7 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity): elif event.values.get("Sensor Status") in SENSOR_STATUS_OFF: self._attr_is_on = False - def _apply_event(self, event: rfxtrxmod.RFXtrxEvent): + def _apply_event(self, event: rfxtrxmod.RFXtrxEvent) -> None: """Apply command from rfxtrx.""" super()._apply_event(event) if event.device.packettype == DEVICE_PACKET_TYPE_LIGHTING4: @@ -192,7 +197,9 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity): self._apply_event_standard(event) @callback - def _handle_event(self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple): + def _handle_event( + self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple + ) -> None: """Check if event applies to me and update.""" if not self._event_applies(event, device_id): return @@ -215,7 +222,7 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity): if self.is_on and self._off_delay is not None: @callback - def off_delay_listener(now): + def off_delay_listener(now: Any) -> None: """Switch device off after a delay.""" self._delay_listener = None self._attr_is_on = False diff --git a/homeassistant/components/rfxtrx/config_flow.py b/homeassistant/components/rfxtrx/config_flow.py index 508ca9a7037..7b5fdc08261 100644 --- a/homeassistant/components/rfxtrx/config_flow.py +++ b/homeassistant/components/rfxtrx/config_flow.py @@ -4,14 +4,14 @@ from __future__ import annotations import copy import itertools import os -from typing import TypedDict, cast +from typing import Any, TypedDict, cast import RFXtrx as rfxtrxmod import serial import serial.tools.list_ports import voluptuous as vol -from homeassistant import config_entries, exceptions +from homeassistant import config_entries, data_entry_flow, exceptions from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( CONF_COMMAND_OFF, @@ -64,7 +64,7 @@ class DeviceData(TypedDict): device_id: DeviceTuple -def none_or_int(value, base): +def none_or_int(value: str | None, base: int) -> int | None: """Check if strin is one otherwise convert to int.""" if value is None: return None @@ -80,17 +80,21 @@ class OptionsFlow(config_entries.OptionsFlow): def __init__(self, config_entry: ConfigEntry) -> None: """Initialize rfxtrx options flow.""" self._config_entry = config_entry - self._global_options = None - self._selected_device = None + self._global_options: dict[str, Any] = {} + self._selected_device: dict[str, Any] = {} self._selected_device_entry_id: str | None = None self._selected_device_event_code: str | None = None self._selected_device_object: rfxtrxmod.RFXtrxEvent | None = None - async def async_step_init(self, user_input=None): + async def async_step_init( + self, user_input: dict[str, Any] | None = None + ) -> data_entry_flow.FlowResult: """Manage the options.""" return await self.async_step_prompt_options() - async def async_step_prompt_options(self, user_input=None): + async def async_step_prompt_options( + self, user_input: dict[str, Any] | None = None + ) -> data_entry_flow.FlowResult: """Prompt for options.""" errors = {} @@ -103,7 +107,8 @@ class OptionsFlow(config_entries.OptionsFlow): entry_id = user_input[CONF_DEVICE] device_data = self._get_device_data(entry_id) self._selected_device_entry_id = entry_id - event_code = device_data[CONF_EVENT_CODE] + event_code = device_data["event_code"] + assert event_code self._selected_device_event_code = event_code self._selected_device = self._config_entry.data[CONF_DEVICES][ event_code @@ -111,7 +116,9 @@ class OptionsFlow(config_entries.OptionsFlow): self._selected_device_object = get_rfx_object(event_code) return await self.async_step_set_device_options() if CONF_EVENT_CODE in user_input: - self._selected_device_event_code = user_input[CONF_EVENT_CODE] + self._selected_device_event_code = cast( + str, user_input[CONF_EVENT_CODE] + ) self._selected_device = {} selected_device_object = get_rfx_object( self._selected_device_event_code @@ -159,13 +166,17 @@ class OptionsFlow(config_entries.OptionsFlow): step_id="prompt_options", data_schema=vol.Schema(options), errors=errors ) - async def async_step_set_device_options(self, user_input=None): + async def async_step_set_device_options( + self, user_input: dict[str, Any] | None = None + ) -> data_entry_flow.FlowResult: """Manage device options.""" errors = {} + assert self._selected_device_object + assert self._selected_device_event_code if user_input is not None: - assert self._selected_device_object - assert self._selected_device_event_code + devices: dict[str, dict[str, Any] | None] = {} + device: dict[str, Any] device_id = get_device_id( self._selected_device_object.device, data_bits=user_input.get(CONF_DATA_BITS), @@ -278,16 +289,16 @@ class OptionsFlow(config_entries.OptionsFlow): ), } ) - devices = { + replace_devices = { entry.id: entry.name_by_user if entry.name_by_user else entry.name for entry in self._device_entries if self._can_replace_device(entry.id) } - if devices: + if replace_devices: data_schema.update( { - vol.Optional(CONF_REPLACE_DEVICE): vol.In(devices), + vol.Optional(CONF_REPLACE_DEVICE): vol.In(replace_devices), } ) @@ -297,11 +308,13 @@ class OptionsFlow(config_entries.OptionsFlow): errors=errors, ) - async def _async_replace_device(self, replace_device): + async def _async_replace_device(self, replace_device: str) -> None: """Migrate properties of a device into another.""" device_registry = self._device_registry old_device = self._selected_device_entry_id + assert old_device old_entry = device_registry.async_get(old_device) + assert old_entry device_registry.async_update_device( replace_device, area_id=old_entry.area_id, @@ -343,23 +356,29 @@ class OptionsFlow(config_entries.OptionsFlow): device_registry.async_remove_device(old_device) - def _can_add_device(self, new_rfx_obj): + def _can_add_device(self, new_rfx_obj: rfxtrxmod.RFXtrxEvent) -> bool: """Check if device does not already exist.""" new_device_id = get_device_id(new_rfx_obj.device) for packet_id, entity_info in self._config_entry.data[CONF_DEVICES].items(): rfx_obj = get_rfx_object(packet_id) + assert rfx_obj + device_id = get_device_id(rfx_obj.device, entity_info.get(CONF_DATA_BITS)) if new_device_id == device_id: return False return True - def _can_replace_device(self, entry_id): + def _can_replace_device(self, entry_id: str) -> bool: """Check if device can be replaced with selected device.""" + assert self._selected_device_object + device_data = self._get_device_data(entry_id) - if (event_code := device_data[CONF_EVENT_CODE]) is not None: + if (event_code := device_data["event_code"]) is not None: rfx_obj = get_rfx_object(event_code) + assert rfx_obj + if ( rfx_obj.device.packettype == self._selected_device_object.device.packettype @@ -371,12 +390,12 @@ class OptionsFlow(config_entries.OptionsFlow): return False - def _get_device_event_code(self, entry_id): + def _get_device_event_code(self, entry_id: str) -> str | None: data = self._get_device_data(entry_id) - return data[CONF_EVENT_CODE] + return data["event_code"] - def _get_device_data(self, entry_id) -> DeviceData: + def _get_device_data(self, entry_id: str) -> DeviceData: """Get event code based on device identifier.""" event_code: str | None = None entry = self._device_registry.async_get(entry_id) @@ -390,7 +409,11 @@ class OptionsFlow(config_entries.OptionsFlow): return DeviceData(event_code=event_code, device_id=device_id) @callback - def update_config_data(self, global_options=None, devices=None): + def update_config_data( + self, + global_options: dict[str, Any] | None = None, + devices: dict[str, Any] | None = None, + ) -> None: """Update data in ConfigEntry.""" entry_data = self._config_entry.data.copy() entry_data[CONF_DEVICES] = copy.deepcopy(self._config_entry.data[CONF_DEVICES]) @@ -413,12 +436,14 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): VERSION = 1 - async def async_step_user(self, user_input=None): + async def async_step_user( + self, user_input: dict[str, Any] | None = None + ) -> data_entry_flow.FlowResult: """Step when user initializes a integration.""" await self.async_set_unique_id(DOMAIN) self._abort_if_unique_id_configured() - errors = {} + errors: dict[str, str] = {} if user_input is not None: if user_input[CONF_TYPE] == "Serial": return await self.async_step_setup_serial() @@ -430,9 +455,11 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): schema = vol.Schema({vol.Required(CONF_TYPE): vol.In(list_of_types)}) return self.async_show_form(step_id="user", data_schema=schema, errors=errors) - async def async_step_setup_network(self, user_input=None): + async def async_step_setup_network( + self, user_input: dict[str, Any] | None = None + ) -> data_entry_flow.FlowResult: """Step when setting up network configuration.""" - errors = {} + errors: dict[str, str] = {} if user_input is not None: host = user_input[CONF_HOST] @@ -455,9 +482,11 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): errors=errors, ) - async def async_step_setup_serial(self, user_input=None): + async def async_step_setup_serial( + self, user_input: dict[str, Any] | None = None + ) -> data_entry_flow.FlowResult: """Step when setting up serial configuration.""" - errors = {} + errors: dict[str, str] = {} if user_input is not None: user_selection = user_input[CONF_DEVICE] @@ -493,9 +522,11 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): errors=errors, ) - async def async_step_setup_serial_manual_path(self, user_input=None): + async def async_step_setup_serial_manual_path( + self, user_input: dict[str, Any] | None = None + ) -> data_entry_flow.FlowResult: """Select path manually.""" - errors = {} + errors: dict[str, str] = {} if user_input is not None: device = user_input[CONF_DEVICE] @@ -514,7 +545,12 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): errors=errors, ) - async def async_validate_rfx(self, host=None, port=None, device=None): + async def async_validate_rfx( + self, + host: str | None = None, + port: int | None = None, + device: str | None = None, + ) -> dict[str, Any]: """Create data for rfxtrx entry.""" success = await self.hass.async_add_executor_job( _test_transport, host, port, device @@ -522,7 +558,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): if not success: raise CannotConnect - data = { + data: dict[str, Any] = { CONF_HOST: host, CONF_PORT: port, CONF_DEVICE: device, @@ -538,7 +574,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): return OptionsFlow(config_entry) -def _test_transport(host, port, device): +def _test_transport(host: str | None, port: int | None, device: str | None) -> bool: """Construct a rfx object based on config.""" if port is not None: try: diff --git a/homeassistant/components/rfxtrx/cover.py b/homeassistant/components/rfxtrx/cover.py index 5e1788194f5..532e41ac50c 100644 --- a/homeassistant/components/rfxtrx/cover.py +++ b/homeassistant/components/rfxtrx/cover.py @@ -10,6 +10,7 @@ from homeassistant.components.cover import CoverEntity, CoverEntityFeature from homeassistant.config_entries import ConfigEntry from homeassistant.const import STATE_OPEN from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.entity import Entity from homeassistant.helpers.entity_platform import AddEntitiesCallback from . import DeviceTuple, RfxtrxCommandEntity, async_setup_platform_entry @@ -24,9 +25,9 @@ from .const import ( _LOGGER = logging.getLogger(__name__) -def supported(event: rfxtrxmod.RFXtrxEvent): +def supported(event: rfxtrxmod.RFXtrxEvent) -> bool: """Return whether an event supports cover.""" - return event.device.known_to_be_rollershutter + return bool(event.device.known_to_be_rollershutter) async def async_setup_entry( @@ -40,8 +41,8 @@ async def async_setup_entry( event: rfxtrxmod.RFXtrxEvent, auto: rfxtrxmod.RFXtrxEvent | None, device_id: DeviceTuple, - entity_info: dict, - ): + entity_info: dict[str, Any], + ) -> list[Entity]: return [ RfxtrxCover( event.device, @@ -144,7 +145,7 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity): self._attr_is_closed = False self.async_write_ha_state() - def _apply_event(self, event: rfxtrxmod.RFXtrxEvent): + def _apply_event(self, event: rfxtrxmod.RFXtrxEvent) -> None: """Apply command from rfxtrx.""" assert isinstance(event, rfxtrxmod.ControlEvent) super()._apply_event(event) @@ -154,7 +155,9 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity): self._attr_is_closed = True @callback - def _handle_event(self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple): + def _handle_event( + self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple + ) -> None: """Check if event applies to me and update.""" if device_id != self._device_id: return diff --git a/homeassistant/components/rfxtrx/device_action.py b/homeassistant/components/rfxtrx/device_action.py index e8fc8c6707d..15595b88cd2 100644 --- a/homeassistant/components/rfxtrx/device_action.py +++ b/homeassistant/components/rfxtrx/device_action.py @@ -1,6 +1,8 @@ """Provides device automations for RFXCOM RFXtrx.""" from __future__ import annotations +from collections.abc import Callable + import voluptuous as vol from homeassistant.components.device_automation.exceptions import ( @@ -65,7 +67,9 @@ async def async_get_actions( return actions -def _get_commands(hass, device_id, action_type): +def _get_commands( + hass: HomeAssistant, device_id: str, action_type: str +) -> tuple[dict[str, str], Callable[..., None]]: device = async_get_device_object(hass, device_id) send_fun = getattr(device, action_type) commands = getattr(device, ACTION_SELECTION[action_type], {}) diff --git a/homeassistant/components/rfxtrx/helpers.py b/homeassistant/components/rfxtrx/helpers.py index 7e567cff1cd..2badc6d4ca5 100644 --- a/homeassistant/components/rfxtrx/helpers.py +++ b/homeassistant/components/rfxtrx/helpers.py @@ -1,14 +1,14 @@ """Provides helpers for RFXtrx.""" -from RFXtrx import get_device +from RFXtrx import RFXtrxDevice, get_device from homeassistant.core import HomeAssistant, callback from homeassistant.helpers import device_registry as dr @callback -def async_get_device_object(hass: HomeAssistant, device_id): +def async_get_device_object(hass: HomeAssistant, device_id: str) -> RFXtrxDevice: """Get a device for the given device registry id.""" device_registry = dr.async_get(hass) registry_device = device_registry.async_get(device_id) diff --git a/homeassistant/components/rfxtrx/light.py b/homeassistant/components/rfxtrx/light.py index 7f32ee0bc83..ad84515d41d 100644 --- a/homeassistant/components/rfxtrx/light.py +++ b/homeassistant/components/rfxtrx/light.py @@ -10,6 +10,7 @@ from homeassistant.components.light import ATTR_BRIGHTNESS, ColorMode, LightEnti from homeassistant.config_entries import ConfigEntry from homeassistant.const import STATE_ON from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.entity import Entity from homeassistant.helpers.entity_platform import AddEntitiesCallback from . import DeviceTuple, RfxtrxCommandEntity, async_setup_platform_entry @@ -18,7 +19,7 @@ from .const import COMMAND_OFF_LIST, COMMAND_ON_LIST _LOGGER = logging.getLogger(__name__) -def supported(event: rfxtrxmod.RFXtrxEvent): +def supported(event: rfxtrxmod.RFXtrxEvent) -> bool: """Return whether an event supports light.""" return ( isinstance(event.device, rfxtrxmod.LightingDevice) @@ -37,8 +38,8 @@ async def async_setup_entry( event: rfxtrxmod.RFXtrxEvent, auto: rfxtrxmod.RFXtrxEvent | None, device_id: DeviceTuple, - entity_info: dict, - ): + entity_info: dict[str, Any], + ) -> list[Entity]: return [ RfxtrxLight( event.device, @@ -91,7 +92,7 @@ class RfxtrxLight(RfxtrxCommandEntity, LightEntity): self._attr_brightness = 0 self.async_write_ha_state() - def _apply_event(self, event: rfxtrxmod.RFXtrxEvent): + def _apply_event(self, event: rfxtrxmod.RFXtrxEvent) -> None: """Apply command from rfxtrx.""" assert isinstance(event, rfxtrxmod.ControlEvent) super()._apply_event(event) @@ -105,7 +106,9 @@ class RfxtrxLight(RfxtrxCommandEntity, LightEntity): self._attr_is_on = brightness > 0 @callback - def _handle_event(self, event, device_id): + def _handle_event( + self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple + ) -> None: """Check if event applies to me and update.""" if device_id != self._device_id: return diff --git a/homeassistant/components/rfxtrx/sensor.py b/homeassistant/components/rfxtrx/sensor.py index c3524b022f8..b4d4d65295c 100644 --- a/homeassistant/components/rfxtrx/sensor.py +++ b/homeassistant/components/rfxtrx/sensor.py @@ -3,9 +3,12 @@ from __future__ import annotations from collections.abc import Callable from dataclasses import dataclass +from datetime import date, datetime +from decimal import Decimal import logging +from typing import Any, cast -from RFXtrx import ControlEvent, RFXtrxEvent, SensorEvent +from RFXtrx import ControlEvent, RFXtrxDevice, RFXtrxEvent, SensorEvent from homeassistant.components.sensor import ( SensorDeviceClass, @@ -30,8 +33,9 @@ from homeassistant.const import ( UV_INDEX, ) from homeassistant.core import HomeAssistant, callback -from homeassistant.helpers.entity import EntityCategory +from homeassistant.helpers.entity import Entity, EntityCategory from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.typing import StateType from . import DeviceTuple, RfxtrxEntity, async_setup_platform_entry, get_rfx_object from .const import ATTR_EVENT @@ -39,14 +43,14 @@ from .const import ATTR_EVENT _LOGGER = logging.getLogger(__name__) -def _battery_convert(value): +def _battery_convert(value: int | None) -> int | None: """Battery is given as a value between 0 and 9.""" if value is None: return None return (value + 1) * 10 -def _rssi_convert(value): +def _rssi_convert(value: int | None) -> str | None: """Rssi is given as dBm value.""" if value is None: return None @@ -57,7 +61,9 @@ def _rssi_convert(value): class RfxtrxSensorEntityDescription(SensorEntityDescription): """Description of sensor entities.""" - convert: Callable = lambda x: x + convert: Callable[[Any], StateType | date | datetime | Decimal] = lambda x: cast( + StateType, x + ) SENSOR_TYPES = ( @@ -243,16 +249,16 @@ async def async_setup_entry( ) -> None: """Set up config entry.""" - def _supported(event): + def _supported(event: RFXtrxEvent) -> bool: return isinstance(event, (ControlEvent, SensorEvent)) def _constructor( event: RFXtrxEvent, auto: RFXtrxEvent | None, device_id: DeviceTuple, - entity_info: dict, - ): - entities: list[RfxtrxSensor] = [] + entity_info: dict[str, Any], + ) -> list[Entity]: + entities: list[Entity] = [] for data_type in set(event.values) & set(SENSOR_TYPES_DICT): entities.append( RfxtrxSensor( @@ -271,14 +277,22 @@ async def async_setup_entry( class RfxtrxSensor(RfxtrxEntity, SensorEntity): - """Representation of a RFXtrx sensor.""" + """Representation of a RFXtrx sensor. + + Since all repeated events have meaning, these types of sensors + need to have force update enabled. + """ _attr_force_update = True - """We should force updates. Repeated states have meaning.""" - entity_description: RfxtrxSensorEntityDescription - def __init__(self, device, device_id, entity_description, event=None): + def __init__( + self, + device: RFXtrxDevice, + device_id: DeviceTuple, + entity_description: RfxtrxSensorEntityDescription, + event: RFXtrxEvent | None = None, + ) -> None: """Initialize the sensor.""" super().__init__(device, device_id, event=event) self.entity_description = entity_description @@ -296,7 +310,7 @@ class RfxtrxSensor(RfxtrxEntity, SensorEntity): self._apply_event(get_rfx_object(event)) @property - def native_value(self): + def native_value(self) -> StateType | date | datetime | Decimal: """Return the state of the sensor.""" if not self._event: return None @@ -304,7 +318,7 @@ class RfxtrxSensor(RfxtrxEntity, SensorEntity): return self.entity_description.convert(value) @callback - def _handle_event(self, event, device_id): + def _handle_event(self, event: RFXtrxEvent, device_id: DeviceTuple) -> None: """Check if event applies to me and update.""" if device_id != self._device_id: return diff --git a/homeassistant/components/rfxtrx/siren.py b/homeassistant/components/rfxtrx/siren.py index 0b49a7d4d8c..c9f10febb6b 100644 --- a/homeassistant/components/rfxtrx/siren.py +++ b/homeassistant/components/rfxtrx/siren.py @@ -58,8 +58,8 @@ async def async_setup_entry( event: rfxtrxmod.RFXtrxEvent, auto: rfxtrxmod.RFXtrxEvent | None, device_id: DeviceTuple, - entity_info: dict, - ): + entity_info: dict[str, Any], + ) -> list[Entity]: """Construct a entity from an event.""" device = event.device @@ -85,6 +85,7 @@ async def async_setup_entry( auto, ) ] + return [] await async_setup_platform_entry( hass, config_entry, async_add_entities, supported, _constructor diff --git a/homeassistant/components/rfxtrx/switch.py b/homeassistant/components/rfxtrx/switch.py index c73b0ba3b1d..edc34aeb80d 100644 --- a/homeassistant/components/rfxtrx/switch.py +++ b/homeassistant/components/rfxtrx/switch.py @@ -10,6 +10,7 @@ from homeassistant.components.switch import SwitchEntity from homeassistant.config_entries import ConfigEntry from homeassistant.const import CONF_COMMAND_OFF, CONF_COMMAND_ON, STATE_ON from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.entity import Entity from homeassistant.helpers.entity_platform import AddEntitiesCallback from . import ( @@ -31,7 +32,7 @@ DATA_SWITCH = f"{DOMAIN}_switch" _LOGGER = logging.getLogger(__name__) -def supported(event): +def supported(event: rfxtrxmod.RFXtrxEvent) -> bool: """Return whether an event supports switch.""" return ( isinstance(event.device, rfxtrxmod.LightingDevice) @@ -52,8 +53,8 @@ async def async_setup_entry( event: rfxtrxmod.RFXtrxEvent, auto: rfxtrxmod.RFXtrxEvent | None, device_id: DeviceTuple, - entity_info: dict, - ): + entity_info: dict[str, Any], + ) -> list[Entity]: return [ RfxtrxSwitch( event.device, @@ -97,7 +98,7 @@ class RfxtrxSwitch(RfxtrxCommandEntity, SwitchEntity): if old_state is not None: self._attr_is_on = old_state.state == STATE_ON - def _apply_event_lighting4(self, event: rfxtrxmod.RFXtrxEvent): + def _apply_event_lighting4(self, event: rfxtrxmod.RFXtrxEvent) -> None: """Apply event for a lighting 4 device.""" if self._data_bits is not None: cmdstr = get_pt2262_cmd(event.device.id_string, self._data_bits) diff --git a/mypy.ini b/mypy.ini index 827d47d12ba..c77934d958a 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1882,6 +1882,16 @@ disallow_untyped_defs = true warn_return_any = true warn_unreachable = true +[mypy-homeassistant.components.rfxtrx.*] +check_untyped_defs = true +disallow_incomplete_defs = true +disallow_subclassing_any = true +disallow_untyped_calls = true +disallow_untyped_decorators = true +disallow_untyped_defs = true +warn_return_any = true +warn_unreachable = true + [mypy-homeassistant.components.rhasspy.*] check_untyped_defs = true disallow_incomplete_defs = true