mirror of
https://github.com/home-assistant/core.git
synced 2025-07-18 18:57:06 +00:00
Improve DataUpdateCoordinator typing in integrations (7) (#84890)
This commit is contained in:
parent
0a77232444
commit
7da434f455
@ -4,6 +4,7 @@ from __future__ import annotations
|
|||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
import socket
|
import socket
|
||||||
from ssl import SSLError
|
from ssl import SSLError
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from deluge_client.client import DelugeRPCClient, FailedToReconnectException
|
from deluge_client.client import DelugeRPCClient, FailedToReconnectException
|
||||||
|
|
||||||
@ -16,7 +17,9 @@ from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, Upda
|
|||||||
from .const import DATA_KEYS, LOGGER
|
from .const import DATA_KEYS, LOGGER
|
||||||
|
|
||||||
|
|
||||||
class DelugeDataUpdateCoordinator(DataUpdateCoordinator):
|
class DelugeDataUpdateCoordinator(
|
||||||
|
DataUpdateCoordinator[dict[Platform, dict[str, Any]]]
|
||||||
|
):
|
||||||
"""Data update coordinator for the Deluge integration."""
|
"""Data update coordinator for the Deluge integration."""
|
||||||
|
|
||||||
config_entry: ConfigEntry
|
config_entry: ConfigEntry
|
||||||
@ -34,7 +37,7 @@ class DelugeDataUpdateCoordinator(DataUpdateCoordinator):
|
|||||||
self.api = api
|
self.api = api
|
||||||
self.config_entry = entry
|
self.config_entry = entry
|
||||||
|
|
||||||
async def _async_update_data(self) -> dict[Platform, dict[str, int | str]]:
|
async def _async_update_data(self) -> dict[Platform, dict[str, Any]]:
|
||||||
"""Get the latest data from Deluge and updates the state."""
|
"""Get the latest data from Deluge and updates the state."""
|
||||||
data = {}
|
data = {}
|
||||||
try:
|
try:
|
||||||
|
@ -45,7 +45,7 @@ class DelugeSwitch(DelugeEntity, SwitchEntity):
|
|||||||
def is_on(self) -> bool:
|
def is_on(self) -> bool:
|
||||||
"""Return state of the switch."""
|
"""Return state of the switch."""
|
||||||
if self.coordinator.data:
|
if self.coordinator.data:
|
||||||
data: dict = self.coordinator.data[Platform.SWITCH]
|
data = self.coordinator.data[Platform.SWITCH]
|
||||||
for torrent in data.values():
|
for torrent in data.values():
|
||||||
item = torrent.popitem()
|
item = torrent.popitem()
|
||||||
if not item[1]:
|
if not item[1]:
|
||||||
|
@ -11,7 +11,7 @@ from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, Upda
|
|||||||
from .const import DOMAIN, LOGGER
|
from .const import DOMAIN, LOGGER
|
||||||
|
|
||||||
|
|
||||||
class GoalZeroDataUpdateCoordinator(DataUpdateCoordinator):
|
class GoalZeroDataUpdateCoordinator(DataUpdateCoordinator[None]):
|
||||||
"""Data update coordinator for the Goal zero integration."""
|
"""Data update coordinator for the Goal zero integration."""
|
||||||
|
|
||||||
config_entry: ConfigEntry
|
config_entry: ConfigEntry
|
||||||
|
@ -52,10 +52,10 @@ class GoalZeroSwitch(GoalZeroEntity, SwitchEntity):
|
|||||||
"""Turn off the switch."""
|
"""Turn off the switch."""
|
||||||
payload = {self.entity_description.key: 0}
|
payload = {self.entity_description.key: 0}
|
||||||
await self._api.post_state(payload=payload)
|
await self._api.post_state(payload=payload)
|
||||||
self.coordinator.async_set_updated_data(data=payload)
|
self.coordinator.async_set_updated_data(None)
|
||||||
|
|
||||||
async def async_turn_on(self, **kwargs: Any) -> None:
|
async def async_turn_on(self, **kwargs: Any) -> None:
|
||||||
"""Turn on the switch."""
|
"""Turn on the switch."""
|
||||||
payload = {self.entity_description.key: 1}
|
payload = {self.entity_description.key: 1}
|
||||||
await self._api.post_state(payload=payload)
|
await self._api.post_state(payload=payload)
|
||||||
self.coordinator.async_set_updated_data(data=payload)
|
self.coordinator.async_set_updated_data(None)
|
||||||
|
@ -358,7 +358,7 @@ class LIFXUpdateCoordinator(DataUpdateCoordinator[None]):
|
|||||||
return self.active_effect.value
|
return self.active_effect.value
|
||||||
|
|
||||||
|
|
||||||
class LIFXSensorUpdateCoordinator(DataUpdateCoordinator):
|
class LIFXSensorUpdateCoordinator(DataUpdateCoordinator[None]):
|
||||||
"""DataUpdateCoordinator to gather data for a specific lifx device."""
|
"""DataUpdateCoordinator to gather data for a specific lifx device."""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
|
@ -101,7 +101,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||||||
except RequestException as err:
|
except RequestException as err:
|
||||||
raise exceptions.ConfigEntryNotReady from err
|
raise exceptions.ConfigEntryNotReady from err
|
||||||
|
|
||||||
async def async_update_data():
|
async def async_update_data() -> None:
|
||||||
"""Fetch data from Nuki bridge."""
|
"""Fetch data from Nuki bridge."""
|
||||||
try:
|
try:
|
||||||
# Note: asyncio.TimeoutError and aiohttp.ClientError are already
|
# Note: asyncio.TimeoutError and aiohttp.ClientError are already
|
||||||
@ -161,7 +161,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||||||
return unload_ok
|
return unload_ok
|
||||||
|
|
||||||
|
|
||||||
class NukiEntity(CoordinatorEntity):
|
class NukiEntity(CoordinatorEntity[DataUpdateCoordinator[None]]):
|
||||||
"""An entity using CoordinatorEntity.
|
"""An entity using CoordinatorEntity.
|
||||||
|
|
||||||
The CoordinatorEntity class provides:
|
The CoordinatorEntity class provides:
|
||||||
|
@ -9,6 +9,7 @@ from homeassistant.components.binary_sensor import (
|
|||||||
from homeassistant.config_entries import ConfigEntry
|
from homeassistant.config_entries import ConfigEntry
|
||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.core import HomeAssistant
|
||||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||||
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
|
||||||
|
|
||||||
from . import NukiEntity
|
from . import NukiEntity
|
||||||
from .const import ATTR_NUKI_ID, DATA_COORDINATOR, DATA_LOCKS, DOMAIN as NUKI_DOMAIN
|
from .const import ATTR_NUKI_ID, DATA_COORDINATOR, DATA_LOCKS, DOMAIN as NUKI_DOMAIN
|
||||||
@ -19,7 +20,7 @@ async def async_setup_entry(
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Set up the Nuki lock binary sensor."""
|
"""Set up the Nuki lock binary sensor."""
|
||||||
data = hass.data[NUKI_DOMAIN][entry.entry_id]
|
data = hass.data[NUKI_DOMAIN][entry.entry_id]
|
||||||
coordinator = data[DATA_COORDINATOR]
|
coordinator: DataUpdateCoordinator[None] = data[DATA_COORDINATOR]
|
||||||
|
|
||||||
entities = []
|
entities = []
|
||||||
|
|
||||||
|
@ -14,6 +14,7 @@ from homeassistant.config_entries import ConfigEntry
|
|||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.core import HomeAssistant
|
||||||
from homeassistant.helpers import config_validation as cv, entity_platform
|
from homeassistant.helpers import config_validation as cv, entity_platform
|
||||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||||
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
|
||||||
|
|
||||||
from . import NukiEntity
|
from . import NukiEntity
|
||||||
from .const import (
|
from .const import (
|
||||||
@ -35,7 +36,7 @@ async def async_setup_entry(
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Set up the Nuki lock platform."""
|
"""Set up the Nuki lock platform."""
|
||||||
data = hass.data[NUKI_DOMAIN][entry.entry_id]
|
data = hass.data[NUKI_DOMAIN][entry.entry_id]
|
||||||
coordinator = data[DATA_COORDINATOR]
|
coordinator: DataUpdateCoordinator[None] = data[DATA_COORDINATOR]
|
||||||
|
|
||||||
entities: list[NukiDeviceEntity] = [
|
entities: list[NukiDeviceEntity] = [
|
||||||
NukiLockEntity(coordinator, lock) for lock in data[DATA_LOCKS]
|
NukiLockEntity(coordinator, lock) for lock in data[DATA_LOCKS]
|
||||||
|
@ -162,7 +162,7 @@ async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) ->
|
|||||||
return unload_ok
|
return unload_ok
|
||||||
|
|
||||||
|
|
||||||
class TomorrowioDataUpdateCoordinator(DataUpdateCoordinator):
|
class TomorrowioDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
|
||||||
"""Define an object to hold Tomorrow.io data."""
|
"""Define an object to hold Tomorrow.io data."""
|
||||||
|
|
||||||
def __init__(self, hass: HomeAssistant, api: TomorrowioV4) -> None:
|
def __init__(self, hass: HomeAssistant, api: TomorrowioV4) -> None:
|
||||||
@ -235,7 +235,7 @@ class TomorrowioDataUpdateCoordinator(DataUpdateCoordinator):
|
|||||||
|
|
||||||
async def _async_update_data(self) -> dict[str, Any]:
|
async def _async_update_data(self) -> dict[str, Any]:
|
||||||
"""Update data via library."""
|
"""Update data via library."""
|
||||||
data = {}
|
data: dict[str, Any] = {}
|
||||||
# If we are refreshing because of a new config entry that's not already in our
|
# If we are refreshing because of a new config entry that's not already in our
|
||||||
# data, we do a partial refresh to avoid wasted API calls.
|
# data, we do a partial refresh to avoid wasted API calls.
|
||||||
if self.data and any(
|
if self.data and any(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user