Move evohome helper functions to separate module (#118497)

initial commit
This commit is contained in:
David Bonnes 2024-05-31 09:22:15 +01:00 committed by GitHub
parent cdcf091c9c
commit 85d979847c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 122 additions and 99 deletions

View File

@ -8,9 +8,7 @@ from __future__ import annotations
from collections.abc import Awaitable
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
import logging
import re
from typing import Any, Final
import evohomeasync as ev1
@ -80,6 +78,13 @@ from .const import (
UTC_OFFSET,
EvoService,
)
from .helpers import (
convert_dict,
convert_until,
dt_aware_to_naive,
dt_local_to_aware,
handle_evo_exception,
)
_LOGGER = logging.getLogger(__name__)
@ -117,98 +122,6 @@ SET_ZONE_OVERRIDE_SCHEMA: Final = vol.Schema(
)
def _dt_local_to_aware(dt_naive: datetime) -> datetime:
dt_aware = dt_util.now() + (dt_naive - datetime.now())
if dt_aware.microsecond >= 500000:
dt_aware += timedelta(seconds=1)
return dt_aware.replace(microsecond=0)
def _dt_aware_to_naive(dt_aware: datetime) -> datetime:
dt_naive = datetime.now() + (dt_aware - dt_util.now())
if dt_naive.microsecond >= 500000:
dt_naive += timedelta(seconds=1)
return dt_naive.replace(microsecond=0)
def convert_until(status_dict: dict, until_key: str) -> None:
"""Reformat a dt str from "%Y-%m-%dT%H:%M:%SZ" as local/aware/isoformat."""
if until_key in status_dict and ( # only present for certain modes
dt_utc_naive := dt_util.parse_datetime(status_dict[until_key])
):
status_dict[until_key] = dt_util.as_local(dt_utc_naive).isoformat()
def convert_dict(dictionary: dict[str, Any]) -> dict[str, Any]:
"""Recursively convert a dict's keys to snake_case."""
def convert_key(key: str) -> str:
"""Convert a string to snake_case."""
string = re.sub(r"[\-\.\s]", "_", str(key))
return (
(string[0]).lower()
+ re.sub(
r"[A-Z]",
lambda matched: f"_{matched.group(0).lower()}", # type:ignore[str-bytes-safe]
string[1:],
)
)
return {
(convert_key(k) if isinstance(k, str) else k): (
convert_dict(v) if isinstance(v, dict) else v
)
for k, v in dictionary.items()
}
def _handle_exception(err: evo.RequestFailed) -> None:
"""Return False if the exception can't be ignored."""
try:
raise err
except evo.AuthenticationFailed:
_LOGGER.error(
(
"Failed to authenticate with the vendor's server. Check your username"
" and password. NB: Some special password characters that work"
" correctly via the website will not work via the web API. Message"
" is: %s"
),
err,
)
except evo.RequestFailed:
if err.status is None:
_LOGGER.warning(
(
"Unable to connect with the vendor's server. "
"Check your network and the vendor's service status page. "
"Message is: %s"
),
err,
)
elif err.status == HTTPStatus.SERVICE_UNAVAILABLE:
_LOGGER.warning(
"The vendor says their server is currently unavailable. "
"Check the vendor's service status page"
)
elif err.status == HTTPStatus.TOO_MANY_REQUESTS:
_LOGGER.warning(
(
"The vendor's API rate limit has been exceeded. "
"If this message persists, consider increasing the %s"
),
CONF_SCAN_INTERVAL,
)
else:
raise # we don't expect/handle any other Exceptions
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Create a (EMEA/EU-based) Honeywell TCC system."""
@ -225,7 +138,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
if tokens.get(ACCESS_TOKEN_EXPIRES) is not None and (
expires := dt_util.parse_datetime(tokens[ACCESS_TOKEN_EXPIRES])
):
tokens[ACCESS_TOKEN_EXPIRES] = _dt_aware_to_naive(expires)
tokens[ACCESS_TOKEN_EXPIRES] = dt_aware_to_naive(expires)
user_data = tokens.pop(USER_DATA, {})
return (tokens, user_data)
@ -243,7 +156,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
try:
await client_v2.login()
except evo.AuthenticationFailed as err:
_handle_exception(err)
handle_evo_exception(err)
return False
finally:
config[DOMAIN][CONF_PASSWORD] = "REDACTED"
@ -458,7 +371,7 @@ class EvoBroker:
async def save_auth_tokens(self) -> None:
"""Save access tokens and session IDs to the store for later use."""
# evohomeasync2 uses naive/local datetimes
access_token_expires = _dt_local_to_aware(
access_token_expires = dt_local_to_aware(
self.client.access_token_expires # type: ignore[arg-type]
)
@ -488,7 +401,7 @@ class EvoBroker:
try:
result = await client_api
except evo.RequestFailed as err:
_handle_exception(err)
handle_evo_exception(err)
return None
if update_state: # wait a moment for system to quiesce before updating state
@ -563,7 +476,7 @@ class EvoBroker:
try:
status = await self._location.refresh_status()
except evo.RequestFailed as err:
_handle_exception(err)
handle_evo_exception(err)
else:
async_dispatcher_send(self.hass, DOMAIN)
_LOGGER.debug("Status = %s", status)

View File

@ -0,0 +1,110 @@
"""Support for (EMEA/EU-based) Honeywell TCC systems."""
from __future__ import annotations
from datetime import datetime, timedelta
from http import HTTPStatus
import logging
import re
from typing import Any
import evohomeasync2 as evo
from homeassistant.const import CONF_SCAN_INTERVAL
import homeassistant.util.dt as dt_util
_LOGGER = logging.getLogger(__name__)
def dt_local_to_aware(dt_naive: datetime) -> datetime:
"""Convert a local/naive datetime to TZ-aware."""
dt_aware = dt_util.now() + (dt_naive - datetime.now())
if dt_aware.microsecond >= 500000:
dt_aware += timedelta(seconds=1)
return dt_aware.replace(microsecond=0)
def dt_aware_to_naive(dt_aware: datetime) -> datetime:
"""Convert a TZ-aware datetime to naive/local."""
dt_naive = datetime.now() + (dt_aware - dt_util.now())
if dt_naive.microsecond >= 500000:
dt_naive += timedelta(seconds=1)
return dt_naive.replace(microsecond=0)
def convert_until(status_dict: dict, until_key: str) -> None:
"""Reformat a dt str from "%Y-%m-%dT%H:%M:%SZ" as local/aware/isoformat."""
if until_key in status_dict and ( # only present for certain modes
dt_utc_naive := dt_util.parse_datetime(status_dict[until_key])
):
status_dict[until_key] = dt_util.as_local(dt_utc_naive).isoformat()
def convert_dict(dictionary: dict[str, Any]) -> dict[str, Any]:
"""Recursively convert a dict's keys to snake_case."""
def convert_key(key: str) -> str:
"""Convert a string to snake_case."""
string = re.sub(r"[\-\.\s]", "_", str(key))
return (
(string[0]).lower()
+ re.sub(
r"[A-Z]",
lambda matched: f"_{matched.group(0).lower()}", # type:ignore[str-bytes-safe]
string[1:],
)
)
return {
(convert_key(k) if isinstance(k, str) else k): (
convert_dict(v) if isinstance(v, dict) else v
)
for k, v in dictionary.items()
}
def handle_evo_exception(err: evo.RequestFailed) -> None:
"""Return False if the exception can't be ignored."""
try:
raise err
except evo.AuthenticationFailed:
_LOGGER.error(
(
"Failed to authenticate with the vendor's server. Check your username"
" and password. NB: Some special password characters that work"
" correctly via the website will not work via the web API. Message"
" is: %s"
),
err,
)
except evo.RequestFailed:
if err.status is None:
_LOGGER.warning(
(
"Unable to connect with the vendor's server. "
"Check your network and the vendor's service status page. "
"Message is: %s"
),
err,
)
elif err.status == HTTPStatus.SERVICE_UNAVAILABLE:
_LOGGER.warning(
"The vendor says their server is currently unavailable. "
"Check the vendor's service status page"
)
elif err.status == HTTPStatus.TOO_MANY_REQUESTS:
_LOGGER.warning(
(
"The vendor's API rate limit has been exceeded. "
"If this message persists, consider increasing the %s"
),
CONF_SCAN_INTERVAL,
)
else:
raise # we don't expect/handle any other Exceptions