mirror of
https://github.com/home-assistant/core.git
synced 2025-07-23 21:27:38 +00:00
Add some more typing to screenlogic (#88522)
This commit is contained in:
parent
84327f203c
commit
edb06c58fa
@ -90,23 +90,23 @@ async def async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None
|
|||||||
await hass.config_entries.async_reload(entry.entry_id)
|
await hass.config_entries.async_reload(entry.entry_id)
|
||||||
|
|
||||||
|
|
||||||
async def async_get_connect_info(hass: HomeAssistant, entry: ConfigEntry):
|
async def async_get_connect_info(
|
||||||
|
hass: HomeAssistant, entry: ConfigEntry
|
||||||
|
) -> dict[str, str | int]:
|
||||||
"""Construct connect_info from configuration entry and returns it to caller."""
|
"""Construct connect_info from configuration entry and returns it to caller."""
|
||||||
mac = entry.unique_id
|
mac = entry.unique_id
|
||||||
# Attempt to rediscover gateway to follow IP changes
|
# Attempt to rediscover gateway to follow IP changes
|
||||||
discovered_gateways = await async_discover_gateways_by_unique_id(hass)
|
discovered_gateways = await async_discover_gateways_by_unique_id(hass)
|
||||||
if mac in discovered_gateways:
|
if mac in discovered_gateways:
|
||||||
connect_info = discovered_gateways[mac]
|
return discovered_gateways[mac]
|
||||||
else:
|
|
||||||
_LOGGER.warning("Gateway rediscovery failed")
|
|
||||||
# Static connection defined or fallback from discovery
|
|
||||||
connect_info = {
|
|
||||||
SL_GATEWAY_NAME: name_for_mac(mac),
|
|
||||||
SL_GATEWAY_IP: entry.data[CONF_IP_ADDRESS],
|
|
||||||
SL_GATEWAY_PORT: entry.data[CONF_PORT],
|
|
||||||
}
|
|
||||||
|
|
||||||
return connect_info
|
_LOGGER.warning("Gateway rediscovery failed")
|
||||||
|
# Static connection defined or fallback from discovery
|
||||||
|
return {
|
||||||
|
SL_GATEWAY_NAME: name_for_mac(mac),
|
||||||
|
SL_GATEWAY_IP: entry.data[CONF_IP_ADDRESS],
|
||||||
|
SL_GATEWAY_PORT: entry.data[CONF_PORT],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class ScreenlogicDataUpdateCoordinator(DataUpdateCoordinator):
|
class ScreenlogicDataUpdateCoordinator(DataUpdateCoordinator):
|
||||||
@ -143,7 +143,7 @@ class ScreenlogicDataUpdateCoordinator(DataUpdateCoordinator):
|
|||||||
"""Return the gateway data."""
|
"""Return the gateway data."""
|
||||||
return self.gateway.get_data()
|
return self.gateway.get_data()
|
||||||
|
|
||||||
async def _async_update_configured_data(self):
|
async def _async_update_configured_data(self) -> None:
|
||||||
"""Update data sets based on equipment config."""
|
"""Update data sets based on equipment config."""
|
||||||
equipment_flags = self.gateway.get_data()[SL_DATA.KEY_CONFIG]["equipment_flags"]
|
equipment_flags = self.gateway.get_data()[SL_DATA.KEY_CONFIG]["equipment_flags"]
|
||||||
if not self.gateway.is_client:
|
if not self.gateway.is_client:
|
||||||
@ -155,7 +155,7 @@ class ScreenlogicDataUpdateCoordinator(DataUpdateCoordinator):
|
|||||||
if equipment_flags & EQUIPMENT.FLAG_CHLORINATOR:
|
if equipment_flags & EQUIPMENT.FLAG_CHLORINATOR:
|
||||||
await self.gateway.async_get_scg()
|
await self.gateway.async_get_scg()
|
||||||
|
|
||||||
async def _async_update_data(self):
|
async def _async_update_data(self) -> None:
|
||||||
"""Fetch data from the Screenlogic gateway."""
|
"""Fetch data from the Screenlogic gateway."""
|
||||||
try:
|
try:
|
||||||
await self._async_update_configured_data()
|
await self._async_update_configured_data()
|
||||||
@ -165,8 +165,9 @@ class ScreenlogicDataUpdateCoordinator(DataUpdateCoordinator):
|
|||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def _async_reconnect_update_data(self):
|
async def _async_reconnect_update_data(self) -> None:
|
||||||
"""Attempt to reconnect to the gateway and fetch data."""
|
"""Attempt to reconnect to the gateway and fetch data."""
|
||||||
|
assert self.config_entry is not None
|
||||||
try:
|
try:
|
||||||
# Clean up the previous connection as we're about to create a new one
|
# Clean up the previous connection as we're about to create a new one
|
||||||
await self.gateway.async_disconnect()
|
await self.gateway.async_disconnect()
|
||||||
|
@ -1,9 +1,10 @@
|
|||||||
"""Base ScreenLogicEntity definitions."""
|
"""Base ScreenLogicEntity definitions."""
|
||||||
|
from datetime import datetime
|
||||||
import logging
|
import logging
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
# from screenlogicpy import ScreenLogicError, ScreenLogicGateway
|
from screenlogicpy import ScreenLogicGateway
|
||||||
from screenlogicpy.const import DATA as SL_DATA, EQUIPMENT, ON_OFF
|
from screenlogicpy.const import CODE, DATA as SL_DATA, EQUIPMENT, ON_OFF
|
||||||
|
|
||||||
from homeassistant.core import callback
|
from homeassistant.core import callback
|
||||||
from homeassistant.exceptions import HomeAssistantError
|
from homeassistant.exceptions import HomeAssistantError
|
||||||
@ -19,7 +20,12 @@ _LOGGER = logging.getLogger(__name__)
|
|||||||
class ScreenlogicEntity(CoordinatorEntity[ScreenlogicDataUpdateCoordinator]):
|
class ScreenlogicEntity(CoordinatorEntity[ScreenlogicDataUpdateCoordinator]):
|
||||||
"""Base class for all ScreenLogic entities."""
|
"""Base class for all ScreenLogic entities."""
|
||||||
|
|
||||||
def __init__(self, coordinator, data_key, enabled=True):
|
def __init__(
|
||||||
|
self,
|
||||||
|
coordinator: ScreenlogicDataUpdateCoordinator,
|
||||||
|
data_key: str,
|
||||||
|
enabled: bool = True,
|
||||||
|
) -> None:
|
||||||
"""Initialize of the entity."""
|
"""Initialize of the entity."""
|
||||||
super().__init__(coordinator)
|
super().__init__(coordinator)
|
||||||
self._data_key = data_key
|
self._data_key = data_key
|
||||||
@ -34,8 +40,10 @@ class ScreenlogicEntity(CoordinatorEntity[ScreenlogicDataUpdateCoordinator]):
|
|||||||
]
|
]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
equipment_model = f"Unknown Model C:{controller_type} H:{hardware_type}"
|
equipment_model = f"Unknown Model C:{controller_type} H:{hardware_type}"
|
||||||
|
mac = self.mac
|
||||||
|
assert mac is not None
|
||||||
self._attr_device_info = DeviceInfo(
|
self._attr_device_info = DeviceInfo(
|
||||||
connections={(dr.CONNECTION_NETWORK_MAC, self.mac)},
|
connections={(dr.CONNECTION_NETWORK_MAC, mac)},
|
||||||
manufacturer="Pentair",
|
manufacturer="Pentair",
|
||||||
model=equipment_model,
|
model=equipment_model,
|
||||||
name=self.gateway_name,
|
name=self.gateway_name,
|
||||||
@ -43,17 +51,18 @@ class ScreenlogicEntity(CoordinatorEntity[ScreenlogicDataUpdateCoordinator]):
|
|||||||
)
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def mac(self):
|
def mac(self) -> str | None:
|
||||||
"""Mac address."""
|
"""Mac address."""
|
||||||
|
assert self.coordinator.config_entry is not None
|
||||||
return self.coordinator.config_entry.unique_id
|
return self.coordinator.config_entry.unique_id
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def config_data(self):
|
def config_data(self) -> dict[str | int, Any]:
|
||||||
"""Shortcut for config data."""
|
"""Shortcut for config data."""
|
||||||
return self.gateway_data[SL_DATA.KEY_CONFIG]
|
return self.gateway_data[SL_DATA.KEY_CONFIG]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def gateway(self):
|
def gateway(self) -> ScreenLogicGateway:
|
||||||
"""Return the gateway."""
|
"""Return the gateway."""
|
||||||
return self.coordinator.gateway
|
return self.coordinator.gateway
|
||||||
|
|
||||||
@ -63,18 +72,18 @@ class ScreenlogicEntity(CoordinatorEntity[ScreenlogicDataUpdateCoordinator]):
|
|||||||
return self.gateway.get_data()
|
return self.gateway.get_data()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def gateway_name(self):
|
def gateway_name(self) -> str:
|
||||||
"""Return the configured name of the gateway."""
|
"""Return the configured name of the gateway."""
|
||||||
return self.gateway.name
|
return self.gateway.name
|
||||||
|
|
||||||
async def _async_refresh(self):
|
async def _async_refresh(self) -> None:
|
||||||
"""Refresh the data from the gateway."""
|
"""Refresh the data from the gateway."""
|
||||||
await self.coordinator.async_refresh()
|
await self.coordinator.async_refresh()
|
||||||
# Second debounced refresh to catch any secondary
|
# Second debounced refresh to catch any secondary
|
||||||
# changes in the device
|
# changes in the device
|
||||||
await self.coordinator.async_request_refresh()
|
await self.coordinator.async_request_refresh()
|
||||||
|
|
||||||
async def _async_refresh_timed(self, now):
|
async def _async_refresh_timed(self, now: datetime) -> None:
|
||||||
"""Refresh from a timed called."""
|
"""Refresh from a timed called."""
|
||||||
await self.coordinator.async_request_refresh()
|
await self.coordinator.async_request_refresh()
|
||||||
|
|
||||||
@ -82,7 +91,13 @@ class ScreenlogicEntity(CoordinatorEntity[ScreenlogicDataUpdateCoordinator]):
|
|||||||
class ScreenLogicPushEntity(ScreenlogicEntity):
|
class ScreenLogicPushEntity(ScreenlogicEntity):
|
||||||
"""Base class for all ScreenLogic push entities."""
|
"""Base class for all ScreenLogic push entities."""
|
||||||
|
|
||||||
def __init__(self, coordinator, data_key, message_code, enabled=True):
|
def __init__(
|
||||||
|
self,
|
||||||
|
coordinator: ScreenlogicDataUpdateCoordinator,
|
||||||
|
data_key: str,
|
||||||
|
message_code: CODE,
|
||||||
|
enabled: bool = True,
|
||||||
|
) -> None:
|
||||||
"""Initialize the entity."""
|
"""Initialize the entity."""
|
||||||
super().__init__(coordinator, data_key, enabled)
|
super().__init__(coordinator, data_key, enabled)
|
||||||
self._update_message_code = message_code
|
self._update_message_code = message_code
|
||||||
@ -108,7 +123,7 @@ class ScreenLogicCircuitEntity(ScreenLogicPushEntity):
|
|||||||
_attr_has_entity_name = True
|
_attr_has_entity_name = True
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def name(self):
|
def name(self) -> str:
|
||||||
"""Get the name of the switch."""
|
"""Get the name of the switch."""
|
||||||
return self.circuit["name"]
|
return self.circuit["name"]
|
||||||
|
|
||||||
@ -117,15 +132,15 @@ class ScreenLogicCircuitEntity(ScreenLogicPushEntity):
|
|||||||
"""Get whether the switch is in on state."""
|
"""Get whether the switch is in on state."""
|
||||||
return self.circuit["value"] == ON_OFF.ON
|
return self.circuit["value"] == ON_OFF.ON
|
||||||
|
|
||||||
async def async_turn_on(self, **kwargs) -> None:
|
async def async_turn_on(self, **kwargs: Any) -> None:
|
||||||
"""Send the ON command."""
|
"""Send the ON command."""
|
||||||
await self._async_set_circuit(ON_OFF.ON)
|
await self._async_set_circuit(ON_OFF.ON)
|
||||||
|
|
||||||
async def async_turn_off(self, **kwargs) -> None:
|
async def async_turn_off(self, **kwargs: Any) -> None:
|
||||||
"""Send the OFF command."""
|
"""Send the OFF command."""
|
||||||
await self._async_set_circuit(ON_OFF.OFF)
|
await self._async_set_circuit(ON_OFF.OFF)
|
||||||
|
|
||||||
async def _async_set_circuit(self, circuit_value) -> None:
|
async def _async_set_circuit(self, circuit_value: int) -> None:
|
||||||
if not await self.gateway.async_set_circuit(self._data_key, circuit_value):
|
if not await self.gateway.async_set_circuit(self._data_key, circuit_value):
|
||||||
raise HomeAssistantError(
|
raise HomeAssistantError(
|
||||||
f"Failed to set_circuit {self._data_key} {circuit_value}"
|
f"Failed to set_circuit {self._data_key} {circuit_value}"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user