Asyncify rfxtrx startup and event handling (#38155)

* Asyncify startup and event handling

* Adjust linting error

* Must use the thread safe add_job function

* Switch to correct async function
This commit is contained in:
Joakim Plate 2020-07-24 14:40:10 +02:00 committed by GitHub
parent 686e6b8fc3
commit 5d28e109e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -172,7 +172,7 @@ async def async_setup_entry(hass, entry: config_entries.ConfigEntry):
"""Set up the RFXtrx component."""
hass.data.setdefault(DOMAIN, {})
await hass.async_add_executor_job(setup_internal, hass, entry)
await async_setup_internal(hass, entry)
for domain in DOMAINS:
hass.async_create_task(
@ -184,49 +184,68 @@ async def async_setup_entry(hass, entry: config_entries.ConfigEntry):
async def async_unload_entry(hass, entry: config_entries.ConfigEntry):
"""Unload RFXtrx component."""
unload_ok = all(
if not all(
await asyncio.gather(
*[
hass.config_entries.async_forward_entry_unload(entry, component)
for component in DOMAINS
]
)
)
):
return False
if unload_ok:
await hass.async_add_executor_job(unload_internal, hass, entry.data)
hass.data.pop(DOMAIN)
return unload_ok
def unload_internal(hass, config):
"""Unload the RFXtrx component."""
hass.services.remove(DOMAIN, SERVICE_SEND)
hass.services.async_remove(DOMAIN, SERVICE_SEND)
listener = hass.data[DOMAIN][DATA_LISTENER]
listener()
rfx_object = hass.data[DOMAIN][DATA_RFXOBJECT]
rfx_object.close_connection()
await hass.async_add_executor_job(rfx_object.close_connection)
return True
def setup_internal(hass, entry: config_entries.ConfigEntry):
"""Set up the RFXtrx component."""
config = entry.data
def _create_rfx(config):
"""Construct a rfx object based on config."""
if config[CONF_PORT] is not None:
# If port is set then we create a TCP connection
rfx = rfxtrxmod.Connect(
(config[CONF_HOST], config[CONF_PORT]),
None,
debug=config[CONF_DEBUG],
transport_protocol=rfxtrxmod.PyNetworkTransport,
)
else:
rfx = rfxtrxmod.Connect(config[CONF_DEVICE], None, debug=config[CONF_DEBUG])
# Setup some per device config
devices = dict()
for event_code, event_config in config[CONF_DEVICES].items():
return rfx
def _get_device_lookup(devices):
"""Get a lookup structure for devices."""
lookup = dict()
for event_code, event_config in devices.items():
event = get_rfx_object(event_code)
device_id = get_device_id(
event.device, data_bits=event_config.get(CONF_DATA_BITS)
)
devices[device_id] = event_config
lookup[device_id] = event_config
return lookup
async def async_setup_internal(hass, entry: config_entries.ConfigEntry):
"""Set up the RFXtrx component."""
config = entry.data
# Initialize library
rfx_object = await hass.async_add_executor_job(_create_rfx, config)
# Setup some per device config
devices = _get_device_lookup(config[CONF_DEVICES])
# Declare the Handle event
def handle_receive(event):
@callback
def async_handle_receive(event):
"""Handle received messages from RFXtrx gateway."""
# Log RFXCOM event
if not event.device.id_string:
@ -246,52 +265,40 @@ def setup_internal(hass, entry: config_entries.ConfigEntry):
data_bits = get_device_data_bits(event.device, devices)
device_id = get_device_id(event.device, data_bits=data_bits)
# Register new devices
if config[CONF_AUTOMATIC_ADD] and device_id not in devices:
_add_device(event, device_id)
# Callback to HA registered components.
hass.helpers.dispatcher.dispatcher_send(SIGNAL_EVENT, event, device_id)
hass.helpers.dispatcher.async_dispatcher_send(SIGNAL_EVENT, event, device_id)
# Signal event to any other listeners
fire_event = devices.get(device_id, {}).get(CONF_FIRE_EVENT)
if fire_event:
hass.bus.fire(EVENT_RFXTRX_EVENT, event_data)
hass.bus.async_fire(EVENT_RFXTRX_EVENT, event_data)
@callback
def device_update(event, device_id):
if device_id not in devices:
data = entry.data.copy()
event_code = binascii.hexlify(event.data).decode("ASCII")
data[CONF_DEVICES][event_code] = device_id
hass.config_entries.async_update_entry(entry=entry, data=data)
devices[device_id] = {}
if config[CONF_AUTOMATIC_ADD]:
hass.helpers.dispatcher.async_dispatcher_connect(SIGNAL_EVENT, device_update)
device = config[CONF_DEVICE]
host = config[CONF_HOST]
port = config[CONF_PORT]
debug = config[CONF_DEBUG]
if port is not None:
# If port is set then we create a TCP connection
rfx_object = rfxtrxmod.Connect(
(host, port),
None,
debug=debug,
transport_protocol=rfxtrxmod.PyNetworkTransport,
)
else:
rfx_object = rfxtrxmod.Connect(device, None, debug=debug)
def _add_device(event, device_id):
"""Add a device to config entry."""
data = entry.data.copy()
event_code = binascii.hexlify(event.data).decode("ASCII")
data[CONF_DEVICES][event_code] = device_id
hass.config_entries.async_update_entry(entry=entry, data=data)
devices[device_id] = {}
@callback
def _start_rfxtrx(event):
rfx_object.event_callback = handle_receive
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, _start_rfxtrx)
"""Start receiving events."""
rfx_object.event_callback = lambda event: hass.add_job(
async_handle_receive, event
)
def _shutdown_rfxtrx(event):
"""Close connection with RFXtrx."""
rfx_object.close_connection()
listener = hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, _shutdown_rfxtrx)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, _start_rfxtrx)
listener = hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _shutdown_rfxtrx)
hass.data[DOMAIN][DATA_LISTENER] = listener
hass.data[DOMAIN][DATA_RFXOBJECT] = rfx_object
@ -300,7 +307,7 @@ def setup_internal(hass, entry: config_entries.ConfigEntry):
event = call.data[ATTR_EVENT]
rfx_object.transport.send(event)
hass.services.register(DOMAIN, SERVICE_SEND, send, schema=SERVICE_SEND_SCHEMA)
hass.services.async_register(DOMAIN, SERVICE_SEND, send, schema=SERVICE_SEND_SCHEMA)
def get_rfx_object(packetid):