diff --git a/homeassistant/components/evohome/__init__.py b/homeassistant/components/evohome/__init__.py index 133851ba1ea..08b65f42688 100644 --- a/homeassistant/components/evohome/__init__.py +++ b/homeassistant/components/evohome/__init__.py @@ -8,9 +8,7 @@ from __future__ import annotations from collections.abc import Awaitable from datetime import datetime, timedelta, timezone -from http import HTTPStatus import logging -import re from typing import Any, Final import evohomeasync as ev1 @@ -80,6 +78,13 @@ from .const import ( UTC_OFFSET, EvoService, ) +from .helpers import ( + convert_dict, + convert_until, + dt_aware_to_naive, + dt_local_to_aware, + handle_evo_exception, +) _LOGGER = logging.getLogger(__name__) @@ -117,98 +122,6 @@ SET_ZONE_OVERRIDE_SCHEMA: Final = vol.Schema( ) -def _dt_local_to_aware(dt_naive: datetime) -> datetime: - dt_aware = dt_util.now() + (dt_naive - datetime.now()) - if dt_aware.microsecond >= 500000: - dt_aware += timedelta(seconds=1) - return dt_aware.replace(microsecond=0) - - -def _dt_aware_to_naive(dt_aware: datetime) -> datetime: - dt_naive = datetime.now() + (dt_aware - dt_util.now()) - if dt_naive.microsecond >= 500000: - dt_naive += timedelta(seconds=1) - return dt_naive.replace(microsecond=0) - - -def convert_until(status_dict: dict, until_key: str) -> None: - """Reformat a dt str from "%Y-%m-%dT%H:%M:%SZ" as local/aware/isoformat.""" - if until_key in status_dict and ( # only present for certain modes - dt_utc_naive := dt_util.parse_datetime(status_dict[until_key]) - ): - status_dict[until_key] = dt_util.as_local(dt_utc_naive).isoformat() - - -def convert_dict(dictionary: dict[str, Any]) -> dict[str, Any]: - """Recursively convert a dict's keys to snake_case.""" - - def convert_key(key: str) -> str: - """Convert a string to snake_case.""" - string = re.sub(r"[\-\.\s]", "_", str(key)) - return ( - (string[0]).lower() - + re.sub( - r"[A-Z]", - lambda matched: f"_{matched.group(0).lower()}", # type:ignore[str-bytes-safe] - string[1:], - ) - ) - - return { - (convert_key(k) if isinstance(k, str) else k): ( - convert_dict(v) if isinstance(v, dict) else v - ) - for k, v in dictionary.items() - } - - -def _handle_exception(err: evo.RequestFailed) -> None: - """Return False if the exception can't be ignored.""" - - try: - raise err - - except evo.AuthenticationFailed: - _LOGGER.error( - ( - "Failed to authenticate with the vendor's server. Check your username" - " and password. NB: Some special password characters that work" - " correctly via the website will not work via the web API. Message" - " is: %s" - ), - err, - ) - - except evo.RequestFailed: - if err.status is None: - _LOGGER.warning( - ( - "Unable to connect with the vendor's server. " - "Check your network and the vendor's service status page. " - "Message is: %s" - ), - err, - ) - - elif err.status == HTTPStatus.SERVICE_UNAVAILABLE: - _LOGGER.warning( - "The vendor says their server is currently unavailable. " - "Check the vendor's service status page" - ) - - elif err.status == HTTPStatus.TOO_MANY_REQUESTS: - _LOGGER.warning( - ( - "The vendor's API rate limit has been exceeded. " - "If this message persists, consider increasing the %s" - ), - CONF_SCAN_INTERVAL, - ) - - else: - raise # we don't expect/handle any other Exceptions - - async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Create a (EMEA/EU-based) Honeywell TCC system.""" @@ -225,7 +138,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: if tokens.get(ACCESS_TOKEN_EXPIRES) is not None and ( expires := dt_util.parse_datetime(tokens[ACCESS_TOKEN_EXPIRES]) ): - tokens[ACCESS_TOKEN_EXPIRES] = _dt_aware_to_naive(expires) + tokens[ACCESS_TOKEN_EXPIRES] = dt_aware_to_naive(expires) user_data = tokens.pop(USER_DATA, {}) return (tokens, user_data) @@ -243,7 +156,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: try: await client_v2.login() except evo.AuthenticationFailed as err: - _handle_exception(err) + handle_evo_exception(err) return False finally: config[DOMAIN][CONF_PASSWORD] = "REDACTED" @@ -458,7 +371,7 @@ class EvoBroker: async def save_auth_tokens(self) -> None: """Save access tokens and session IDs to the store for later use.""" # evohomeasync2 uses naive/local datetimes - access_token_expires = _dt_local_to_aware( + access_token_expires = dt_local_to_aware( self.client.access_token_expires # type: ignore[arg-type] ) @@ -488,7 +401,7 @@ class EvoBroker: try: result = await client_api except evo.RequestFailed as err: - _handle_exception(err) + handle_evo_exception(err) return None if update_state: # wait a moment for system to quiesce before updating state @@ -563,7 +476,7 @@ class EvoBroker: try: status = await self._location.refresh_status() except evo.RequestFailed as err: - _handle_exception(err) + handle_evo_exception(err) else: async_dispatcher_send(self.hass, DOMAIN) _LOGGER.debug("Status = %s", status) diff --git a/homeassistant/components/evohome/helpers.py b/homeassistant/components/evohome/helpers.py new file mode 100644 index 00000000000..f84d2945779 --- /dev/null +++ b/homeassistant/components/evohome/helpers.py @@ -0,0 +1,110 @@ +"""Support for (EMEA/EU-based) Honeywell TCC systems.""" + +from __future__ import annotations + +from datetime import datetime, timedelta +from http import HTTPStatus +import logging +import re +from typing import Any + +import evohomeasync2 as evo + +from homeassistant.const import CONF_SCAN_INTERVAL +import homeassistant.util.dt as dt_util + +_LOGGER = logging.getLogger(__name__) + + +def dt_local_to_aware(dt_naive: datetime) -> datetime: + """Convert a local/naive datetime to TZ-aware.""" + dt_aware = dt_util.now() + (dt_naive - datetime.now()) + if dt_aware.microsecond >= 500000: + dt_aware += timedelta(seconds=1) + return dt_aware.replace(microsecond=0) + + +def dt_aware_to_naive(dt_aware: datetime) -> datetime: + """Convert a TZ-aware datetime to naive/local.""" + dt_naive = datetime.now() + (dt_aware - dt_util.now()) + if dt_naive.microsecond >= 500000: + dt_naive += timedelta(seconds=1) + return dt_naive.replace(microsecond=0) + + +def convert_until(status_dict: dict, until_key: str) -> None: + """Reformat a dt str from "%Y-%m-%dT%H:%M:%SZ" as local/aware/isoformat.""" + if until_key in status_dict and ( # only present for certain modes + dt_utc_naive := dt_util.parse_datetime(status_dict[until_key]) + ): + status_dict[until_key] = dt_util.as_local(dt_utc_naive).isoformat() + + +def convert_dict(dictionary: dict[str, Any]) -> dict[str, Any]: + """Recursively convert a dict's keys to snake_case.""" + + def convert_key(key: str) -> str: + """Convert a string to snake_case.""" + string = re.sub(r"[\-\.\s]", "_", str(key)) + return ( + (string[0]).lower() + + re.sub( + r"[A-Z]", + lambda matched: f"_{matched.group(0).lower()}", # type:ignore[str-bytes-safe] + string[1:], + ) + ) + + return { + (convert_key(k) if isinstance(k, str) else k): ( + convert_dict(v) if isinstance(v, dict) else v + ) + for k, v in dictionary.items() + } + + +def handle_evo_exception(err: evo.RequestFailed) -> None: + """Return False if the exception can't be ignored.""" + + try: + raise err + + except evo.AuthenticationFailed: + _LOGGER.error( + ( + "Failed to authenticate with the vendor's server. Check your username" + " and password. NB: Some special password characters that work" + " correctly via the website will not work via the web API. Message" + " is: %s" + ), + err, + ) + + except evo.RequestFailed: + if err.status is None: + _LOGGER.warning( + ( + "Unable to connect with the vendor's server. " + "Check your network and the vendor's service status page. " + "Message is: %s" + ), + err, + ) + + elif err.status == HTTPStatus.SERVICE_UNAVAILABLE: + _LOGGER.warning( + "The vendor says their server is currently unavailable. " + "Check the vendor's service status page" + ) + + elif err.status == HTTPStatus.TOO_MANY_REQUESTS: + _LOGGER.warning( + ( + "The vendor's API rate limit has been exceeded. " + "If this message persists, consider increasing the %s" + ), + CONF_SCAN_INTERVAL, + ) + + else: + raise # we don't expect/handle any other Exceptions