ESPHome move ReconnectLogic to aioesphomeapi (#57601)

This commit is contained in:
Otto Winter 2021-10-13 19:04:23 +02:00 committed by GitHub
parent 628e59ff11
commit df4e8721e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 25 additions and 264 deletions

View File

@ -1,8 +1,6 @@
"""Support for esphome devices.""" """Support for esphome devices."""
from __future__ import annotations from __future__ import annotations
import asyncio
from collections.abc import Awaitable
from dataclasses import dataclass, field from dataclasses import dataclass, field
import functools import functools
import logging import logging
@ -19,12 +17,12 @@ from aioesphomeapi import (
EntityState, EntityState,
HomeassistantServiceCall, HomeassistantServiceCall,
InvalidEncryptionKeyAPIError, InvalidEncryptionKeyAPIError,
ReconnectLogic,
RequiresEncryptionAPIError, RequiresEncryptionAPIError,
UserService, UserService,
UserServiceArgType, UserServiceArgType,
) )
import voluptuous as vol import voluptuous as vol
from zeroconf import DNSPointer, RecordUpdate, RecordUpdateListener, Zeroconf
from homeassistant import const from homeassistant import const
from homeassistant.components import zeroconf from homeassistant.components import zeroconf
@ -119,7 +117,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
zeroconf_instance = await zeroconf.async_get_instance(hass) zeroconf_instance = await zeroconf.async_get_instance(hass)
cli = APIClient( cli = APIClient(
hass.loop,
host, host,
port, port,
password, password,
@ -259,7 +256,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
_send_home_assistant_state(entity_id, attribute, hass.states.get(entity_id)) _send_home_assistant_state(entity_id, attribute, hass.states.get(entity_id))
) )
async def on_login() -> None: async def on_connect() -> None:
"""Subscribe to states and list entities on successful API login.""" """Subscribe to states and list entities on successful API login."""
nonlocal device_id nonlocal device_id
try: try:
@ -285,8 +282,26 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
# Re-connection logic will trigger after this # Re-connection logic will trigger after this
await cli.disconnect() await cli.disconnect()
async def on_disconnect() -> None:
"""Run disconnect callbacks on API disconnect."""
for disconnect_cb in entry_data.disconnect_callbacks:
disconnect_cb()
entry_data.disconnect_callbacks = []
entry_data.available = False
entry_data.async_update_device_state(hass)
async def on_connect_error(err: Exception) -> None:
"""Start reauth flow if appropriate connect error type."""
if isinstance(err, (RequiresEncryptionAPIError, InvalidEncryptionKeyAPIError)):
entry.async_start_reauth(hass)
reconnect_logic = ReconnectLogic( reconnect_logic = ReconnectLogic(
hass, cli, entry, host, on_login, zeroconf_instance client=cli,
on_connect=on_connect,
on_disconnect=on_disconnect,
zeroconf_instance=zeroconf_instance,
name=host,
on_connect_error=on_connect_error,
) )
async def complete_setup() -> None: async def complete_setup() -> None:
@ -302,258 +317,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return True return True
class ReconnectLogic(RecordUpdateListener):
"""Reconnectiong logic handler for ESPHome config entries.
Contains two reconnect strategies:
- Connect with increasing time between connection attempts.
- Listen to zeroconf mDNS records, if any records are found for this device, try reconnecting immediately.
"""
def __init__(
self,
hass: HomeAssistant,
cli: APIClient,
entry: ConfigEntry,
host: str,
on_login: Callable[[], Awaitable[None]],
zc: Zeroconf,
) -> None:
"""Initialize ReconnectingLogic."""
self._hass = hass
self._cli = cli
self._entry = entry
self._host = host
self._on_login = on_login
self._zc = zc
# Flag to check if the device is connected
self._connected = True
self._connected_lock = asyncio.Lock()
self._zc_lock = asyncio.Lock()
self._zc_listening = False
# Event the different strategies use for issuing a reconnect attempt.
self._reconnect_event = asyncio.Event()
# The task containing the infinite reconnect loop while running
self._loop_task: asyncio.Task[None] | None = None
# How many reconnect attempts have there been already, used for exponential wait time
self._tries = 0
self._tries_lock = asyncio.Lock()
# Track the wait task to cancel it on HA shutdown
self._wait_task: asyncio.Task[None] | None = None
self._wait_task_lock = asyncio.Lock()
@property
def _entry_data(self) -> RuntimeEntryData | None:
domain_data = DomainData.get(self._hass)
try:
return domain_data.get_entry_data(self._entry)
except KeyError:
return None
async def _on_disconnect(self) -> None:
"""Log and issue callbacks when disconnecting."""
if self._entry_data is None:
return
# This can happen often depending on WiFi signal strength.
# So therefore all these connection warnings are logged
# as infos. The "unavailable" logic will still trigger so the
# user knows if the device is not connected.
_LOGGER.info("Disconnected from ESPHome API for %s", self._host)
# Run disconnect hooks
for disconnect_cb in self._entry_data.disconnect_callbacks:
disconnect_cb()
self._entry_data.disconnect_callbacks = []
self._entry_data.available = False
self._entry_data.async_update_device_state(self._hass)
await self._start_zc_listen()
# Reset tries
async with self._tries_lock:
self._tries = 0
# Connected needs to be reset before the reconnect event (opposite order of check)
async with self._connected_lock:
self._connected = False
self._reconnect_event.set()
async def _wait_and_start_reconnect(self) -> None:
"""Wait for exponentially increasing time to issue next reconnect event."""
async with self._tries_lock:
tries = self._tries
# If not first re-try, wait and print message
# Cap wait time at 1 minute. This is because while working on the
# device (e.g. soldering stuff), users don't want to have to wait
# a long time for their device to show up in HA again (this was
# mentioned a lot in early feedback)
tries = min(tries, 10) # prevent OverflowError
wait_time = int(round(min(1.8 ** tries, 60.0)))
if tries == 1:
_LOGGER.info("Trying to reconnect to %s in the background", self._host)
_LOGGER.debug("Retrying %s in %d seconds", self._host, wait_time)
await asyncio.sleep(wait_time)
async with self._wait_task_lock:
self._wait_task = None
self._reconnect_event.set()
async def _try_connect(self) -> None:
"""Try connecting to the API client."""
async with self._tries_lock:
tries = self._tries
self._tries += 1
try:
await self._cli.connect(on_stop=self._on_disconnect, login=True)
except APIConnectionError as error:
if isinstance(
error, (RequiresEncryptionAPIError, InvalidEncryptionKeyAPIError)
):
self._entry.async_start_reauth(self._hass)
level = logging.WARNING if tries == 0 else logging.DEBUG
_LOGGER.log(
level,
"Can't connect to ESPHome API for %s (%s): %s",
self._entry.unique_id,
self._host,
error,
)
await self._start_zc_listen()
# Schedule re-connect in event loop in order not to delay HA
# startup. First connect is scheduled in tracked tasks.
async with self._wait_task_lock:
# Allow only one wait task at a time
# can happen if mDNS record received while waiting, then use existing wait task
if self._wait_task is not None:
return
self._wait_task = self._hass.loop.create_task(
self._wait_and_start_reconnect()
)
else:
_LOGGER.info("Successfully connected to %s", self._host)
async with self._tries_lock:
self._tries = 0
async with self._connected_lock:
self._connected = True
await self._stop_zc_listen()
self._hass.async_create_task(self._on_login())
async def _reconnect_once(self) -> None:
# Wait and clear reconnection event
await self._reconnect_event.wait()
self._reconnect_event.clear()
# If in connected state, do not try to connect again.
async with self._connected_lock:
if self._connected:
return
# Check if the entry got removed or disabled, in which case we shouldn't reconnect
if not DomainData.get(self._hass).is_entry_loaded(self._entry):
# When removing/disconnecting manually
return
device_registry = self._hass.helpers.device_registry.async_get(self._hass)
devices = dr.async_entries_for_config_entry(
device_registry, self._entry.entry_id
)
for device in devices:
# There is only one device in ESPHome
if device.disabled:
# Don't attempt to connect if it's disabled
return
await self._try_connect()
async def _reconnect_loop(self) -> None:
while True:
try:
await self._reconnect_once()
except asyncio.CancelledError: # pylint: disable=try-except-raise
raise
except Exception: # pylint: disable=broad-except
_LOGGER.error("Caught exception while reconnecting", exc_info=True)
async def start(self) -> None:
"""Start the reconnecting logic background task."""
# Create reconnection loop outside of HA's tracked tasks in order
# not to delay startup.
self._loop_task = self._hass.loop.create_task(self._reconnect_loop())
async with self._connected_lock:
self._connected = False
self._reconnect_event.set()
async def stop(self) -> None:
"""Stop the reconnecting logic background task. Does not disconnect the client."""
if self._loop_task is not None:
self._loop_task.cancel()
self._loop_task = None
async with self._wait_task_lock:
if self._wait_task is not None:
self._wait_task.cancel()
self._wait_task = None
await self._stop_zc_listen()
async def _start_zc_listen(self) -> None:
"""Listen for mDNS records.
This listener allows us to schedule a reconnect as soon as a
received mDNS record indicates the node is up again.
"""
async with self._zc_lock:
if not self._zc_listening:
self._zc.async_add_listener(self, None)
self._zc_listening = True
async def _stop_zc_listen(self) -> None:
"""Stop listening for zeroconf updates."""
async with self._zc_lock:
if self._zc_listening:
self._zc.async_remove_listener(self)
self._zc_listening = False
@callback
def stop_callback(self) -> None:
"""Stop as an async callback function."""
self._hass.async_create_task(self.stop())
def async_update_records(
self, zc: Zeroconf, now: float, records: list[RecordUpdate]
) -> None:
"""Listen to zeroconf updated mDNS records.
This is a mDNS record from the device and could mean it just woke up.
"""
# Check if already connected, no lock needed for this access and
# bail if either the entry was already teared down or we haven't received device info yet
if (
self._connected
or self._reconnect_event.is_set()
or self._entry_data is None
or self._entry_data.device_info is None
):
return
filter_alias = f"{self._entry_data.device_info.name}._esphomelib._tcp.local."
for record_update in records:
# We only consider PTR records and match using the alias name
if (
not isinstance(record_update.new, DNSPointer)
or record_update.new.alias != filter_alias
):
continue
# Tell reconnection logic to retry connection attempt now (even before reconnect timer finishes)
_LOGGER.debug(
"%s: Triggering reconnect because of received mDNS record %s",
self._host,
record_update.new,
)
self._reconnect_event.set()
return
async def _async_setup_device_registry( async def _async_setup_device_registry(
hass: HomeAssistant, entry: ConfigEntry, device_info: EsphomeDeviceInfo hass: HomeAssistant, entry: ConfigEntry, device_info: EsphomeDeviceInfo
) -> str: ) -> str:

View File

@ -260,7 +260,6 @@ class EsphomeFlowHandler(ConfigFlow, domain=DOMAIN):
assert self._host is not None assert self._host is not None
assert self._port is not None assert self._port is not None
cli = APIClient( cli = APIClient(
self.hass.loop,
self._host, self._host,
self._port, self._port,
"", "",
@ -292,7 +291,6 @@ class EsphomeFlowHandler(ConfigFlow, domain=DOMAIN):
assert self._host is not None assert self._host is not None
assert self._port is not None assert self._port is not None
cli = APIClient( cli = APIClient(
self.hass.loop,
self._host, self._host,
self._port, self._port,
self._password, self._password,

View File

@ -3,7 +3,7 @@
"name": "ESPHome", "name": "ESPHome",
"config_flow": true, "config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/esphome", "documentation": "https://www.home-assistant.io/integrations/esphome",
"requirements": ["aioesphomeapi==9.1.5"], "requirements": ["aioesphomeapi==10.0.0"],
"zeroconf": ["_esphomelib._tcp.local."], "zeroconf": ["_esphomelib._tcp.local."],
"codeowners": ["@OttoWinter", "@jesserockz"], "codeowners": ["@OttoWinter", "@jesserockz"],
"after_dependencies": ["zeroconf", "tag"], "after_dependencies": ["zeroconf", "tag"],

View File

@ -161,7 +161,7 @@ aioeagle==1.1.0
aioemonitor==1.0.5 aioemonitor==1.0.5
# homeassistant.components.esphome # homeassistant.components.esphome
aioesphomeapi==9.1.5 aioesphomeapi==10.0.0
# homeassistant.components.flo # homeassistant.components.flo
aioflo==0.4.1 aioflo==0.4.1

View File

@ -109,7 +109,7 @@ aioeagle==1.1.0
aioemonitor==1.0.5 aioemonitor==1.0.5
# homeassistant.components.esphome # homeassistant.components.esphome
aioesphomeapi==9.1.5 aioesphomeapi==10.0.0
# homeassistant.components.flo # homeassistant.components.flo
aioflo==0.4.1 aioflo==0.4.1

View File

@ -33,7 +33,7 @@ def mock_client():
with patch("homeassistant.components.esphome.config_flow.APIClient") as mock_client: with patch("homeassistant.components.esphome.config_flow.APIClient") as mock_client:
def mock_constructor( def mock_constructor(
loop, host, port, password, zeroconf_instance=None, noise_psk=None host, port, password, zeroconf_instance=None, noise_psk=None
): ):
"""Fake the client constructor.""" """Fake the client constructor."""
mock_client.host = host mock_client.host = host