Update typing (2) [a-i] (#63923)

This commit is contained in:
Marc Mueller 2022-01-11 21:23:26 +01:00 committed by GitHub
parent 17bf51a855
commit 4e2cd1bec0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 27 additions and 29 deletions

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from datetime import timedelta from datetime import timedelta
import logging import logging
from typing import Any, Dict from typing import Any
from accuweather import AccuWeather, ApiError, InvalidApiKeyError, RequestsExceededError from accuweather import AccuWeather, ApiError, InvalidApiKeyError, RequestsExceededError
from aiohttp import ClientSession from aiohttp import ClientSession
@ -63,7 +63,7 @@ async def update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
await hass.config_entries.async_reload(entry.entry_id) await hass.config_entries.async_reload(entry.entry_id)
class AccuWeatherDataUpdateCoordinator(DataUpdateCoordinator[Dict[str, Any]]): class AccuWeatherDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Class to manage fetching AccuWeather data API.""" """Class to manage fetching AccuWeather data API."""
def __init__( def __init__(

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from collections.abc import Mapping from collections.abc import Mapping
from datetime import timedelta from datetime import timedelta
from math import ceil from math import ceil
from typing import Any, Dict, cast from typing import Any, cast
from pyairvisual import CloudAPI, NodeSamba from pyairvisual import CloudAPI, NodeSamba
from pyairvisual.errors import ( from pyairvisual.errors import (
@ -213,7 +213,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
try: try:
data = await api_coro data = await api_coro
return cast(Dict[str, Any], data) return cast(dict[str, Any], data)
except (InvalidKeyError, KeyExpiredError) as ex: except (InvalidKeyError, KeyExpiredError) as ex:
raise ConfigEntryAuthFailed from ex raise ConfigEntryAuthFailed from ex
except AirVisualError as err: except AirVisualError as err:
@ -256,7 +256,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
entry.data[CONF_IP_ADDRESS], entry.data[CONF_PASSWORD] entry.data[CONF_IP_ADDRESS], entry.data[CONF_PASSWORD]
) as node: ) as node:
data = await node.async_get_latest_measurements() data = await node.async_get_latest_measurements()
return cast(Dict[str, Any], data) return cast(dict[str, Any], data)
except NodeProError as err: except NodeProError as err:
raise UpdateFailed(f"Error while retrieving data: {err}") from err raise UpdateFailed(f"Error while retrieving data: {err}") from err

View File

@ -3,7 +3,6 @@ from __future__ import annotations
from datetime import timedelta from datetime import timedelta
import logging import logging
from typing import Dict
from aioaseko import APIUnavailable, MobileAccount, Unit, Variable from aioaseko import APIUnavailable, MobileAccount, Unit, Variable
@ -52,7 +51,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return unload_ok return unload_ok
class AsekoDataUpdateCoordinator(DataUpdateCoordinator[Dict[str, Variable]]): class AsekoDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Variable]]):
"""Class to manage fetching Aseko unit data from single endpoint.""" """Class to manage fetching Aseko unit data from single endpoint."""
def __init__(self, hass: HomeAssistant, unit: Unit) -> None: def __init__(self, hass: HomeAssistant, unit: Unit) -> None:

View File

@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from typing import Any, Awaitable, Callable, Dict, TypedDict, cast from typing import Any, Awaitable, Callable, TypedDict, cast
import voluptuous as vol import voluptuous as vol
from voluptuous.humanize import humanize_error from voluptuous.humanize import humanize_error
@ -632,7 +632,7 @@ async def _async_process_config(
try: try:
raw_config = blueprint_inputs.async_substitute() raw_config = blueprint_inputs.async_substitute()
config_block = cast( config_block = cast(
Dict[str, Any], dict[str, Any],
await async_validate_config_item(hass, raw_config), await async_validate_config_item(hass, raw_config),
) )
except vol.Invalid as err: except vol.Invalid as err:

View File

@ -1,9 +1,8 @@
"""Constants for the Canary integration.""" """Constants for the Canary integration."""
from __future__ import annotations from __future__ import annotations
from collections.abc import ValuesView from collections.abc import ValuesView
from typing import List, Optional, Tuple, TypedDict from typing import Optional, TypedDict
from canary.api import Location from canary.api import Location
@ -15,4 +14,4 @@ class CanaryData(TypedDict):
readings: dict[str, ValuesView] readings: dict[str, ValuesView]
SensorTypeItem = Tuple[str, Optional[str], Optional[str], Optional[str], List[str]] SensorTypeItem = tuple[str, Optional[str], Optional[str], Optional[str], list[str]]

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import timedelta from datetime import timedelta
from typing import TYPE_CHECKING, Any, Dict, TypeVar from typing import TYPE_CHECKING, Any, TypeVar
from pyfronius import BadStatusError, FroniusError from pyfronius import BadStatusError, FroniusError
@ -35,7 +35,7 @@ if TYPE_CHECKING:
class FroniusCoordinatorBase( class FroniusCoordinatorBase(
ABC, DataUpdateCoordinator[Dict[SolarNetId, Dict[str, Any]]] ABC, DataUpdateCoordinator[dict[SolarNetId, dict[str, Any]]]
): ):
"""Query Fronius endpoint and keep track of seen conditions.""" """Query Fronius endpoint and keep track of seen conditions."""

View File

@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from typing import Any, Dict, cast from typing import Any, cast
from aiohttp import ClientSession from aiohttp import ClientSession
from aiohttp.client_exceptions import ClientConnectorError from aiohttp.client_exceptions import ClientConnectorError
@ -89,7 +89,7 @@ class GiosDataUpdateCoordinator(DataUpdateCoordinator):
"""Update data via library.""" """Update data via library."""
try: try:
async with timeout(API_TIMEOUT): async with timeout(API_TIMEOUT):
return cast(Dict[str, Any], await self.gios.async_update()) return cast(dict[str, Any], await self.gios.async_update())
except ( except (
ApiError, ApiError,
NoStationError, NoStationError,

View File

@ -6,7 +6,7 @@ import asyncio
from collections.abc import Iterable from collections.abc import Iterable
from contextvars import ContextVar from contextvars import ContextVar
import logging import logging
from typing import Any, List, cast from typing import Any, cast
import voluptuous as vol import voluptuous as vol
@ -191,7 +191,7 @@ def get_entity_ids(
entity_ids = group.attributes[ATTR_ENTITY_ID] entity_ids = group.attributes[ATTR_ENTITY_ID]
if not domain_filter: if not domain_filter:
return cast(List[str], entity_ids) return cast(list[str], entity_ids)
domain_filter = f"{domain_filter.lower()}." domain_filter = f"{domain_filter.lower()}."

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from collections import Counter from collections import Counter
import itertools import itertools
import logging import logging
from typing import Any, Set, cast from typing import Any, cast
import voluptuous as vol import voluptuous as vol
@ -252,7 +252,7 @@ class LightGroup(GroupEntity, LightEntity):
if all_supported_color_modes: if all_supported_color_modes:
# Merge all color modes. # Merge all color modes.
self._attr_supported_color_modes = cast( self._attr_supported_color_modes = cast(
Set[str], set().union(*all_supported_color_modes) set[str], set().union(*all_supported_color_modes)
) )
self._attr_supported_features = 0 self._attr_supported_features = 0

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import asyncio import asyncio
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from datetime import timedelta from datetime import timedelta
from typing import Any, Dict, cast from typing import Any, cast
from aioguardian import Client from aioguardian import Client
from aioguardian.errors import GuardianError from aioguardian.errors import GuardianError
@ -49,4 +49,4 @@ class GuardianDataUpdateCoordinator(DataUpdateCoordinator[dict]):
resp = await self._api_coro() resp = await self._api_coro()
except GuardianError as err: except GuardianError as err:
raise UpdateFailed(err) from err raise UpdateFailed(err) from err
return cast(Dict[str, Any], resp["data"]) return cast(dict[str, Any], resp["data"])

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
import logging import logging
import re import re
from typing import Any, Dict, List, cast from typing import Any, cast
from stringcase import snakecase from stringcase import snakecase
@ -34,7 +34,7 @@ _LOGGER = logging.getLogger(__name__)
_DEVICE_SCAN = f"{DEVICE_TRACKER_DOMAIN}/device_scan" _DEVICE_SCAN = f"{DEVICE_TRACKER_DOMAIN}/device_scan"
_HostType = Dict[str, Any] _HostType = dict[str, Any]
def _get_hosts( def _get_hosts(
@ -44,7 +44,7 @@ def _get_hosts(
if not ignore_subscriptions and key not in router.subscriptions: if not ignore_subscriptions and key not in router.subscriptions:
continue continue
try: try:
return cast(List[_HostType], router.data[key]["Hosts"]["Host"]) return cast(list[_HostType], router.data[key]["Hosts"]["Host"])
except KeyError: except KeyError:
_LOGGER.debug("%s[%s][%s] not in data", key, "Hosts", "Host") _LOGGER.debug("%s[%s][%s] not in data", key, "Hosts", "Host")
return None return None

View File

@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from typing import Any, Dict, cast from typing import Any, cast
import voluptuous as vol import voluptuous as vol
@ -187,7 +187,7 @@ class InputSelectStorageCollection(collection.StorageCollection):
async def _process_create_data(self, data: dict[str, Any]) -> dict[str, Any]: async def _process_create_data(self, data: dict[str, Any]) -> dict[str, Any]:
"""Validate the config is valid.""" """Validate the config is valid."""
return cast(Dict[str, Any], self.CREATE_SCHEMA(data)) return cast(dict[str, Any], self.CREATE_SCHEMA(data))
@callback @callback
def _get_suggested_id(self, info: dict[str, Any]) -> str: def _get_suggested_id(self, info: dict[str, Any]) -> str:

View File

@ -5,7 +5,7 @@ import asyncio
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from datetime import timedelta from datetime import timedelta
from functools import partial from functools import partial
from typing import Any, Dict, cast from typing import Any, cast
from pyiqvia import Client from pyiqvia import Client
from pyiqvia.errors import IQVIAError from pyiqvia.errors import IQVIAError
@ -65,7 +65,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
except IQVIAError as err: except IQVIAError as err:
raise UpdateFailed from err raise UpdateFailed from err
return cast(Dict[str, Any], data) return cast(dict[str, Any], data)
coordinators = {} coordinators = {}
init_data_update_tasks = [] init_data_update_tasks = []