Reduce boilerplate code in entry init of rfxtrx (#58844)

* Reduce boilerplate code for rfxtrx

* Use rfxtrx built in to construct event

* Fixup mypy after rebase

* Also fix callable import
This commit is contained in:
Joakim Plate
2021-12-22 22:38:55 +01:00
committed by GitHub
parent 91a8b1e7b3
commit bda1f02371
7 changed files with 180 additions and 358 deletions

View File

@@ -5,7 +5,7 @@ from collections.abc import Callable
from dataclasses import dataclass
import logging
from RFXtrx import ControlEvent, SensorEvent
from RFXtrx import ControlEvent, RFXtrxEvent, SensorEvent
from homeassistant.components.sensor import (
SensorDeviceClass,
@@ -14,7 +14,6 @@ from homeassistant.components.sensor import (
SensorStateClass,
)
from homeassistant.const import (
CONF_DEVICES,
DEGREE,
ELECTRIC_CURRENT_AMPERE,
ELECTRIC_POTENTIAL_VOLT,
@@ -32,13 +31,7 @@ from homeassistant.const import (
from homeassistant.core import callback
from homeassistant.helpers.entity import EntityCategory
from . import (
CONF_DATA_BITS,
RfxtrxEntity,
connect_auto_add,
get_device_id,
get_rfx_object,
)
from . import DeviceTuple, RfxtrxEntity, async_setup_platform_entry, get_rfx_object
from .const import ATTR_EVENT
_LOGGER = logging.getLogger(__name__)
@@ -219,62 +212,33 @@ async def async_setup_entry(
config_entry,
async_add_entities,
):
"""Set up platform."""
discovery_info = config_entry.data
data_ids = set()
"""Set up config entry."""
def supported(event):
def _supported(event):
return isinstance(event, (ControlEvent, SensorEvent))
entities = []
for packet_id, entity_info in discovery_info[CONF_DEVICES].items():
if (event := get_rfx_object(packet_id)) is None:
_LOGGER.error("Invalid device: %s", packet_id)
continue
if not supported(event):
continue
device_id = get_device_id(
event.device, data_bits=entity_info.get(CONF_DATA_BITS)
)
def _constructor(
event: RFXtrxEvent,
auto: bool,
device_id: DeviceTuple,
entity_info: dict,
):
entities: list[RfxtrxSensor] = []
for data_type in set(event.values) & set(SENSOR_TYPES_DICT):
data_id = (*device_id, str(data_type))
if data_id in data_ids:
continue
data_ids.add(data_id)
entity = RfxtrxSensor(event.device, device_id, SENSOR_TYPES_DICT[data_type])
entities.append(entity)
async_add_entities(entities)
@callback
def sensor_update(event, device_id):
"""Handle sensor updates from the RFXtrx gateway."""
if not supported(event):
return
for data_type in set(event.values) & set(SENSOR_TYPES_DICT):
data_id = (*device_id, data_type)
if data_id in data_ids:
continue
data_ids.add(data_id)
_LOGGER.info(
"Added sensor (Device ID: %s Class: %s Sub: %s, Event: %s)",
event.device.id_string.lower(),
event.device.__class__.__name__,
event.device.subtype,
"".join(f"{x:02x}" for x in event.data),
entities.append(
RfxtrxSensor(
event.device,
device_id,
SENSOR_TYPES_DICT[data_type],
event=event if auto else None,
)
)
entity = RfxtrxSensor(
event.device, device_id, SENSOR_TYPES_DICT[data_type], event=event
)
async_add_entities([entity])
return entities
# Subscribe to main RFXtrx events
connect_auto_add(hass, discovery_info, sensor_update)
await async_setup_platform_entry(
hass, config_entry, async_add_entities, _supported, _constructor
)
class RfxtrxSensor(RfxtrxEntity, SensorEntity):