mirror of
https://github.com/home-assistant/core.git
synced 2025-04-26 10:17:51 +00:00
516 lines
18 KiB
Python
516 lines
18 KiB
Python
"""Support for SmartThings Cloud."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable
|
|
import contextlib
|
|
from dataclasses import dataclass
|
|
from http import HTTPStatus
|
|
import logging
|
|
from typing import TYPE_CHECKING, Any, cast
|
|
|
|
from aiohttp import ClientResponseError
|
|
from pysmartthings import (
|
|
Attribute,
|
|
Capability,
|
|
ComponentStatus,
|
|
Device,
|
|
DeviceEvent,
|
|
Lifecycle,
|
|
Scene,
|
|
SmartThings,
|
|
SmartThingsAuthenticationFailedError,
|
|
SmartThingsConnectionError,
|
|
SmartThingsSinkError,
|
|
Status,
|
|
)
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import (
|
|
ATTR_CONNECTIONS,
|
|
ATTR_HW_VERSION,
|
|
ATTR_MANUFACTURER,
|
|
ATTR_MODEL,
|
|
ATTR_SW_VERSION,
|
|
ATTR_VIA_DEVICE,
|
|
CONF_ACCESS_TOKEN,
|
|
CONF_TOKEN,
|
|
EVENT_HOMEASSISTANT_STOP,
|
|
Platform,
|
|
)
|
|
from homeassistant.core import Event, HomeAssistant
|
|
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
|
|
from homeassistant.helpers import device_registry as dr, entity_registry as er
|
|
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
|
from homeassistant.helpers.config_entry_oauth2_flow import (
|
|
OAuth2Session,
|
|
async_get_config_entry_implementation,
|
|
)
|
|
from homeassistant.helpers.entity_registry import RegistryEntry, async_migrate_entries
|
|
|
|
from .const import (
|
|
BINARY_SENSOR_ATTRIBUTES_TO_CAPABILITIES,
|
|
CONF_INSTALLED_APP_ID,
|
|
CONF_LOCATION_ID,
|
|
CONF_SUBSCRIPTION_ID,
|
|
DOMAIN,
|
|
EVENT_BUTTON,
|
|
MAIN,
|
|
OLD_DATA,
|
|
SENSOR_ATTRIBUTES_TO_CAPABILITIES,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class SmartThingsData:
|
|
"""Define an object to hold SmartThings data."""
|
|
|
|
devices: dict[str, FullDevice]
|
|
scenes: dict[str, Scene]
|
|
rooms: dict[str, str]
|
|
client: SmartThings
|
|
|
|
|
|
@dataclass
|
|
class FullDevice:
|
|
"""Define an object to hold device data."""
|
|
|
|
device: Device
|
|
status: dict[str, ComponentStatus]
|
|
|
|
|
|
type SmartThingsConfigEntry = ConfigEntry[SmartThingsData]
|
|
|
|
PLATFORMS = [
|
|
Platform.BINARY_SENSOR,
|
|
Platform.BUTTON,
|
|
Platform.CLIMATE,
|
|
Platform.COVER,
|
|
Platform.EVENT,
|
|
Platform.FAN,
|
|
Platform.LIGHT,
|
|
Platform.LOCK,
|
|
Platform.MEDIA_PLAYER,
|
|
Platform.NUMBER,
|
|
Platform.SCENE,
|
|
Platform.SELECT,
|
|
Platform.SENSOR,
|
|
Platform.SWITCH,
|
|
Platform.UPDATE,
|
|
Platform.VALVE,
|
|
]
|
|
|
|
|
|
async def async_setup_entry(hass: HomeAssistant, entry: SmartThingsConfigEntry) -> bool:
|
|
"""Initialize config entry which represents an installed SmartApp."""
|
|
# The oauth smartthings entry will have a token, older ones are version 3
|
|
# after migration but still require reauthentication
|
|
if CONF_TOKEN not in entry.data:
|
|
raise ConfigEntryAuthFailed("Config entry missing token")
|
|
implementation = await async_get_config_entry_implementation(hass, entry)
|
|
session = OAuth2Session(hass, entry, implementation)
|
|
|
|
try:
|
|
await session.async_ensure_token_valid()
|
|
except ClientResponseError as err:
|
|
if err.status == HTTPStatus.BAD_REQUEST:
|
|
raise ConfigEntryAuthFailed("Token not valid, trigger renewal") from err
|
|
raise ConfigEntryNotReady from err
|
|
|
|
client = SmartThings(session=async_get_clientsession(hass))
|
|
|
|
async def _refresh_token() -> str:
|
|
await session.async_ensure_token_valid()
|
|
token = session.token[CONF_ACCESS_TOKEN]
|
|
if TYPE_CHECKING:
|
|
assert isinstance(token, str)
|
|
return token
|
|
|
|
client.refresh_token_function = _refresh_token
|
|
|
|
def _handle_max_connections() -> None:
|
|
_LOGGER.debug(
|
|
"We hit the limit of max connections or we could not remove the old one, so retrying"
|
|
)
|
|
hass.config_entries.async_schedule_reload(entry.entry_id)
|
|
|
|
client.max_connections_reached_callback = _handle_max_connections
|
|
|
|
def _handle_new_subscription_identifier(identifier: str | None) -> None:
|
|
"""Handle a new subscription identifier."""
|
|
hass.config_entries.async_update_entry(
|
|
entry,
|
|
data={
|
|
**entry.data,
|
|
CONF_SUBSCRIPTION_ID: identifier,
|
|
},
|
|
)
|
|
if identifier is not None:
|
|
_LOGGER.debug("Updating subscription ID to %s", identifier)
|
|
else:
|
|
_LOGGER.debug("Removing subscription ID")
|
|
|
|
client.new_subscription_id_callback = _handle_new_subscription_identifier
|
|
|
|
if (old_identifier := entry.data.get(CONF_SUBSCRIPTION_ID)) is not None:
|
|
_LOGGER.debug("Trying to delete old subscription %s", old_identifier)
|
|
try:
|
|
await client.delete_subscription(old_identifier)
|
|
except SmartThingsConnectionError as err:
|
|
raise ConfigEntryNotReady("Could not delete old subscription") from err
|
|
|
|
_LOGGER.debug("Trying to create a new subscription")
|
|
try:
|
|
subscription = await client.create_subscription(
|
|
entry.data[CONF_LOCATION_ID],
|
|
entry.data[CONF_TOKEN][CONF_INSTALLED_APP_ID],
|
|
)
|
|
except SmartThingsSinkError as err:
|
|
_LOGGER.exception("Couldn't create a new subscription")
|
|
raise ConfigEntryNotReady from err
|
|
subscription_id = subscription.subscription_id
|
|
_handle_new_subscription_identifier(subscription_id)
|
|
|
|
entry.async_create_background_task(
|
|
hass,
|
|
client.subscribe(
|
|
entry.data[CONF_LOCATION_ID],
|
|
entry.data[CONF_TOKEN][CONF_INSTALLED_APP_ID],
|
|
subscription,
|
|
),
|
|
"smartthings_socket",
|
|
)
|
|
|
|
device_status: dict[str, FullDevice] = {}
|
|
try:
|
|
rooms = {
|
|
room.room_id: room.name
|
|
for room in await client.get_rooms(location_id=entry.data[CONF_LOCATION_ID])
|
|
}
|
|
devices = await client.get_devices()
|
|
for device in devices:
|
|
status = process_status(await client.get_device_status(device.device_id))
|
|
device_status[device.device_id] = FullDevice(device=device, status=status)
|
|
except SmartThingsAuthenticationFailedError as err:
|
|
raise ConfigEntryAuthFailed from err
|
|
|
|
device_registry = dr.async_get(hass)
|
|
create_devices(device_registry, device_status, entry, rooms)
|
|
|
|
scenes = {
|
|
scene.scene_id: scene
|
|
for scene in await client.get_scenes(location_id=entry.data[CONF_LOCATION_ID])
|
|
}
|
|
|
|
def handle_deleted_device(device_id: str) -> None:
|
|
"""Handle a deleted device."""
|
|
dev_entry = device_registry.async_get_device(
|
|
identifiers={(DOMAIN, device_id)},
|
|
)
|
|
if dev_entry is not None:
|
|
device_registry.async_update_device(
|
|
dev_entry.id, remove_config_entry_id=entry.entry_id
|
|
)
|
|
|
|
entry.async_on_unload(
|
|
client.add_device_lifecycle_event_listener(
|
|
Lifecycle.DELETE, handle_deleted_device
|
|
)
|
|
)
|
|
|
|
entry.runtime_data = SmartThingsData(
|
|
devices={
|
|
device_id: device
|
|
for device_id, device in device_status.items()
|
|
if MAIN in device.status
|
|
},
|
|
client=client,
|
|
scenes=scenes,
|
|
rooms=rooms,
|
|
)
|
|
|
|
# Events are deprecated and will be removed in 2025.10
|
|
def handle_button_press(event: DeviceEvent) -> None:
|
|
"""Handle a button press."""
|
|
if (
|
|
event.capability is Capability.BUTTON
|
|
and event.attribute is Attribute.BUTTON
|
|
):
|
|
hass.bus.async_fire(
|
|
EVENT_BUTTON,
|
|
{
|
|
"component_id": event.component_id,
|
|
"device_id": event.device_id,
|
|
"location_id": event.location_id,
|
|
"value": event.value,
|
|
"name": entry.runtime_data.devices[event.device_id].device.label,
|
|
"data": event.data,
|
|
},
|
|
)
|
|
|
|
entry.async_on_unload(
|
|
client.add_unspecified_device_event_listener(handle_button_press)
|
|
)
|
|
|
|
async def _handle_shutdown(_: Event) -> None:
|
|
"""Handle shutdown."""
|
|
await client.delete_subscription(subscription_id)
|
|
|
|
entry.async_on_unload(
|
|
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _handle_shutdown)
|
|
)
|
|
|
|
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
|
|
|
device_entries = dr.async_entries_for_config_entry(device_registry, entry.entry_id)
|
|
for device_entry in device_entries:
|
|
device_id = next(
|
|
identifier[1]
|
|
for identifier in device_entry.identifiers
|
|
if identifier[0] == DOMAIN
|
|
)
|
|
if device_id in device_status:
|
|
continue
|
|
device_registry.async_update_device(
|
|
device_entry.id, remove_config_entry_id=entry.entry_id
|
|
)
|
|
|
|
return True
|
|
|
|
|
|
async def async_unload_entry(
|
|
hass: HomeAssistant, entry: SmartThingsConfigEntry
|
|
) -> bool:
|
|
"""Unload a config entry."""
|
|
client = entry.runtime_data.client
|
|
if (subscription_id := entry.data.get(CONF_SUBSCRIPTION_ID)) is not None:
|
|
with contextlib.suppress(SmartThingsConnectionError):
|
|
await client.delete_subscription(subscription_id)
|
|
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
|
|
|
|
|
|
async def async_migrate_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|
"""Handle config entry migration."""
|
|
|
|
if entry.version < 3:
|
|
# We keep the old data around, so we can use that to clean up the webhook in the future
|
|
hass.config_entries.async_update_entry(
|
|
entry, version=3, data={OLD_DATA: dict(entry.data)}
|
|
)
|
|
|
|
if entry.minor_version < 2:
|
|
|
|
def migrate_entities(entity_entry: RegistryEntry) -> dict[str, Any] | None:
|
|
if entity_entry.domain == "binary_sensor":
|
|
device_id, attribute = entity_entry.unique_id.split(".")
|
|
if (
|
|
capability := BINARY_SENSOR_ATTRIBUTES_TO_CAPABILITIES.get(
|
|
attribute
|
|
)
|
|
) is None:
|
|
return None
|
|
new_unique_id = (
|
|
f"{device_id}_{MAIN}_{capability}_{attribute}_{attribute}"
|
|
)
|
|
return {
|
|
"new_unique_id": new_unique_id,
|
|
}
|
|
if entity_entry.domain in {"cover", "climate", "fan", "light", "lock"}:
|
|
return {"new_unique_id": f"{entity_entry.unique_id}_{MAIN}"}
|
|
if entity_entry.domain == "sensor":
|
|
delimiter = "." if " " not in entity_entry.unique_id else " "
|
|
if delimiter not in entity_entry.unique_id:
|
|
return None
|
|
device_id, attribute = entity_entry.unique_id.split(
|
|
delimiter, maxsplit=1
|
|
)
|
|
if (
|
|
capability := SENSOR_ATTRIBUTES_TO_CAPABILITIES.get(attribute)
|
|
) is None:
|
|
if attribute in {
|
|
"energy_meter",
|
|
"power_meter",
|
|
"deltaEnergy_meter",
|
|
"powerEnergy_meter",
|
|
"energySaved_meter",
|
|
}:
|
|
return {
|
|
"new_unique_id": f"{device_id}_{MAIN}_{Capability.POWER_CONSUMPTION_REPORT}_{Attribute.POWER_CONSUMPTION}_{attribute}",
|
|
}
|
|
if attribute in {
|
|
"X Coordinate",
|
|
"Y Coordinate",
|
|
"Z Coordinate",
|
|
}:
|
|
new_attribute = {
|
|
"X Coordinate": "x_coordinate",
|
|
"Y Coordinate": "y_coordinate",
|
|
"Z Coordinate": "z_coordinate",
|
|
}[attribute]
|
|
return {
|
|
"new_unique_id": f"{device_id}_{MAIN}_{Capability.THREE_AXIS}_{Attribute.THREE_AXIS}_{new_attribute}",
|
|
}
|
|
if attribute in {
|
|
Attribute.MACHINE_STATE,
|
|
Attribute.COMPLETION_TIME,
|
|
}:
|
|
capability = determine_machine_type(
|
|
hass, entry.entry_id, device_id
|
|
)
|
|
if capability is None:
|
|
return None
|
|
return {
|
|
"new_unique_id": f"{device_id}_{MAIN}_{capability}_{attribute}_{attribute}",
|
|
}
|
|
return None
|
|
return {
|
|
"new_unique_id": f"{device_id}_{MAIN}_{capability}_{attribute}_{attribute}",
|
|
}
|
|
|
|
if entity_entry.domain == "switch":
|
|
return {
|
|
"new_unique_id": f"{entity_entry.unique_id}_{MAIN}_{Capability.SWITCH}_{Attribute.SWITCH}_{Attribute.SWITCH}",
|
|
}
|
|
|
|
return None
|
|
|
|
await async_migrate_entries(hass, entry.entry_id, migrate_entities)
|
|
hass.config_entries.async_update_entry(
|
|
entry,
|
|
minor_version=2,
|
|
)
|
|
|
|
return True
|
|
|
|
|
|
def determine_machine_type(
|
|
hass: HomeAssistant,
|
|
entry_id: str,
|
|
device_id: str,
|
|
) -> Capability | None:
|
|
"""Determine the machine type for a device."""
|
|
entity_registry = er.async_get(hass)
|
|
entries = er.async_entries_for_config_entry(entity_registry, entry_id)
|
|
device_entries = [entry for entry in entries if device_id in entry.unique_id]
|
|
for entry in device_entries:
|
|
if Attribute.DISHWASHER_JOB_STATE in entry.unique_id:
|
|
return Capability.DISHWASHER_OPERATING_STATE
|
|
if Attribute.WASHER_JOB_STATE in entry.unique_id:
|
|
return Capability.WASHER_OPERATING_STATE
|
|
if Attribute.DRYER_JOB_STATE in entry.unique_id:
|
|
return Capability.DRYER_OPERATING_STATE
|
|
if Attribute.OVEN_JOB_STATE in entry.unique_id:
|
|
return Capability.OVEN_OPERATING_STATE
|
|
return None
|
|
|
|
|
|
def create_devices(
|
|
device_registry: dr.DeviceRegistry,
|
|
devices: dict[str, FullDevice],
|
|
entry: SmartThingsConfigEntry,
|
|
rooms: dict[str, str],
|
|
) -> None:
|
|
"""Create devices in the device registry."""
|
|
for device in sorted(
|
|
devices.values(), key=lambda d: d.device.parent_device_id or ""
|
|
):
|
|
kwargs: dict[str, Any] = {}
|
|
if device.device.hub is not None:
|
|
kwargs = {
|
|
ATTR_SW_VERSION: device.device.hub.firmware_version,
|
|
ATTR_MODEL: device.device.hub.hardware_type,
|
|
}
|
|
if device.device.hub.mac_address:
|
|
kwargs[ATTR_CONNECTIONS] = {
|
|
(dr.CONNECTION_NETWORK_MAC, device.device.hub.mac_address)
|
|
}
|
|
if device.device.parent_device_id and device.device.parent_device_id in devices:
|
|
kwargs[ATTR_VIA_DEVICE] = (DOMAIN, device.device.parent_device_id)
|
|
if (ocf := device.device.ocf) is not None:
|
|
kwargs.update(
|
|
{
|
|
ATTR_MANUFACTURER: ocf.manufacturer_name,
|
|
ATTR_MODEL: (
|
|
(ocf.model_number.split("|")[0]) if ocf.model_number else None
|
|
),
|
|
ATTR_HW_VERSION: ocf.hardware_version,
|
|
ATTR_SW_VERSION: ocf.firmware_version,
|
|
}
|
|
)
|
|
if (viper := device.device.viper) is not None:
|
|
kwargs.update(
|
|
{
|
|
ATTR_MANUFACTURER: viper.manufacturer_name,
|
|
ATTR_MODEL: viper.model_name,
|
|
ATTR_HW_VERSION: viper.hardware_version,
|
|
ATTR_SW_VERSION: viper.software_version,
|
|
}
|
|
)
|
|
device_registry.async_get_or_create(
|
|
config_entry_id=entry.entry_id,
|
|
identifiers={(DOMAIN, device.device.device_id)},
|
|
configuration_url="https://account.smartthings.com",
|
|
name=device.device.label,
|
|
suggested_area=(
|
|
rooms.get(device.device.room_id) if device.device.room_id else None
|
|
),
|
|
**kwargs,
|
|
)
|
|
|
|
|
|
KEEP_CAPABILITY_QUIRK: dict[
|
|
Capability | str, Callable[[dict[Attribute | str, Status]], bool]
|
|
] = {
|
|
Capability.DRYER_OPERATING_STATE: (
|
|
lambda status: status[Attribute.SUPPORTED_MACHINE_STATES].value is not None
|
|
),
|
|
Capability.WASHER_OPERATING_STATE: (
|
|
lambda status: status[Attribute.SUPPORTED_MACHINE_STATES].value is not None
|
|
),
|
|
Capability.DEMAND_RESPONSE_LOAD_CONTROL: lambda _: True,
|
|
}
|
|
|
|
|
|
def process_status(status: dict[str, ComponentStatus]) -> dict[str, ComponentStatus]:
|
|
"""Remove disabled capabilities from status."""
|
|
if (main_component := status.get(MAIN)) is None:
|
|
return status
|
|
if (
|
|
disabled_components_capability := main_component.get(
|
|
Capability.CUSTOM_DISABLED_COMPONENTS
|
|
)
|
|
) is not None:
|
|
disabled_components = cast(
|
|
list[str],
|
|
disabled_components_capability[Attribute.DISABLED_COMPONENTS].value,
|
|
)
|
|
if disabled_components is not None:
|
|
for component in disabled_components:
|
|
if component in status:
|
|
del status[component]
|
|
for component_status in status.values():
|
|
process_component_status(component_status)
|
|
return status
|
|
|
|
|
|
def process_component_status(status: ComponentStatus) -> None:
|
|
"""Remove disabled capabilities from component status."""
|
|
if (
|
|
disabled_capabilities_capability := status.get(
|
|
Capability.CUSTOM_DISABLED_CAPABILITIES
|
|
)
|
|
) is not None:
|
|
disabled_capabilities = cast(
|
|
list[Capability | str],
|
|
disabled_capabilities_capability[Attribute.DISABLED_CAPABILITIES].value,
|
|
)
|
|
if disabled_capabilities is not None:
|
|
for capability in disabled_capabilities:
|
|
if capability in status and (
|
|
capability not in KEEP_CAPABILITY_QUIRK
|
|
or not KEEP_CAPABILITY_QUIRK[capability](status[capability])
|
|
):
|
|
del status[capability]
|